Logstash Crash with Persistent Queue and Kafka Input

Apologies that we missed this question.

How are you getting you beats data into Kafka? Direct or via LS.
1)

Beats -> LS -> Kafka -> LS(PQ) -> ES

or 2)

Beats -> Kafka -> LS(PQ) -> ES

Also, which beats are you using?

See this comment for an explanation of the problem