Logstash Crash with Persistent Queue and Kafka Input

I use to write all my beats output directly to my logstash nodes with persistent queue enabled with no issues. I have recently updated my elastic stack architecture to include a kafka cluster that all data goes to for logstash to consume in order to allow for multiple consumers of the data.

Ever since using the kafka input, the logstash persistent queue configuration has not worked. I runs for a few brief moments until logstash comes to halt and doesn't log anything. The only output from the "crash" is in /var/log/messages as large stack trace which I will try to post in a subsequent reply cause character limit.

Changing the queue type back to memory is the current work around but I am not comfortable long term with the volatility of this configuration should logstash run into issues and lose the in-flight in-memory events.

First half of stack trace:

Exception in thread "[main]>worker79" org.logstash.ackedqueue.QueueRuntimeException: deserialize invocation error
at org.logstash.ackedqueue.Queue.deserialize(Queue.java:634)
at org.logstash.ackedqueue.Page.lambda$readBatch$0(Page.java:57)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.logstash.ackedqueue.Page.readBatch(Page.java:57)
at org.logstash.ackedqueue.Queue._readPageBatch(Queue.java:501)
at org.logstash.ackedqueue.Queue.readBatch(Queue.java:492)
at org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary$RubyAckedQueue.ruby_read_batch(JrubyAckedQueueExtLibrary.java:165)
at org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary$RubyAckedQueue$INVOKER$i$2$0$ruby_read_batch.call(JrubyAckedQueueExtLibrary$RubyAckedQueue$INVOKER$i$2$0$ruby_read_batch.gen)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
at rubyjit.LogStash::Util::WrappedAckedQueue$$read_batch_d49228611d045c1563109c40e125e11f223ebb3d1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:86)
at rubyjit.LogStash::Util::WrappedAckedQueue$$read_batch_d49228611d045c1563109c40e125e11f223ebb3d1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:221)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$read_next_35f44d806c5bc13a311ff64530bf70b917b790a01028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:257)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$read_next_35f44d806c5bc13a311ff64530bf70b917b790a01028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:141)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked
_queue.rb:176)
at rubyjit$LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121$block_0$RUBY$__file__.call(rubyjit$LogStash::Util::WrappedAckedQueue::ReadClient$$read_
batch_bc834a631fd628792504b0d95b265bac58efdc531028566121$block_0$RUBY$__file__)
at org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)
at org.jruby.runtime.Block.yield(Block.java:142)
at org.jruby.ext.thread.Mutex.synchronize(Mutex.java:149)
at org.jruby.ext.thread.Mutex$INVOKER$i$0$0$synchronize.call(Mutex$INVOKER$i$0$0$synchronize.gen)
at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)
at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:176
)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:141)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134)
at rubyjit.LogStash::Pipeline$$worker_loop_bed164e81d3915bbde9314175e87568804b658df1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:348)
at rubyjit.LogStash::Pipeline$$worker_loop_bed164e81d3915bbde9314175e87568804b658df1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:221)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
at org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
at org.jruby.runtime.Block.call(Block.java:101)
at org.jruby.RubyProc.call(RubyProc.java:300)
at org.jruby.RubyProc.call(RubyProc.java:230)
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.logstash.ackedqueue.Queue.deserialize(Queue.java:632)
... 50 more
Caused by: java.lang.IllegalArgumentException: Missing Valuefier handling for full class name=[B, simple name=byte[]
at org.logstash.Valuefier.convertNonCollection(Valuefier.java:51)
at org.logstash.Valuefier.convert(Valuefier.java:90)
at org.logstash.ConvertedMap.newFromMap(ConvertedMap.java:31)
at org.logstash.Valuefier.convert(Valuefier.java:69)
at org.logstash.Event.<init>(Event.java:66)
at org.logstash.Event.fromSerializableMap(Event.java:212)
at org.logstash.Event.deserialize(Event.java:399)
... 54 more
Caused by: java.lang.IllegalArgumentException: No enum constant org.logstash.bivalues.BiValues.[B
at java.lang.Enum.valueOf(Enum.java:238)
at org.logstash.bivalues.BiValues.valueOf(BiValues.java:20)
at org.logstash.bivalues.BiValues.newBiValue(BiValues.java:88)
at org.logstash.Valuefier.convertNonCollection(Valuefier.java:48)
... 60 more

Second half of stack trace:

at org.logstash.ackedqueue.Queue.deserialize(Queue.java:634)
at org.logstash.ackedqueue.Page.lambda$readBatch$0(Page.java:57)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.logstash.ackedqueue.Page.readBatch(Page.java:57)
at org.logstash.ackedqueue.Queue._readPageBatch(Queue.java:501)
at org.logstash.ackedqueue.Queue.readBatch(Queue.java:492)
at org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary$RubyAckedQueue.ruby_read_batch(JrubyAckedQueueExtLibrary.java:165)
at org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary$RubyAckedQueue$INVOKER$i$2$0$ruby_read_batch.call(JrubyAckedQueueExtLibrary$RubyAckedQueue$INVOKER$i$2$0$ruby_read_batch.gen)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
at rubyjit.LogStash::Util::WrappedAckedQueue$$read_batch_d49228611d045c1563109c40e125e11f223ebb3d1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:86)
at rubyjit.LogStash::Util::WrappedAckedQueue$$read_batch_d49228611d045c1563109c40e125e11f223ebb3d1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:221)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$read_next_35f44d806c5bc13a311ff64530bf70b917b790a01028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:257)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$read_next_35f44d806c5bc13a311ff64530bf70b917b790a01028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:141)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:176)
at rubyjit$LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121$block_0$RUBY$__file__.call(rubyjit$LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121$block_0$RUBY$__file__)
at org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)
at org.jruby.runtime.Block.yield(Block.java:142)
at org.jruby.ext.thread.Mutex.synchronize(Mutex.java:149)
at org.jruby.ext.thread.Mutex$INVOKER$i$0$0$synchronize.call(Mutex$INVOKER$i$0$0$synchronize.gen)
at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)
at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:176)
at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:141)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134)
at rubyjit.LogStash::Pipeline$$worker_loop_bed164e81d3915bbde9314175e87568804b658df1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:348)
at rubyjit.LogStash::Pipeline$$worker_loop_bed164e81d3915bbde9314175e87568804b658df1028566121.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)
at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:221)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
at org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
at org.jruby.runtime.Block.call(Block.java:101)
at org.jruby.RubyProc.call(RubyProc.java:300)
at org.jruby.RubyProc.call(RubyProc.java:230)
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.logstash.ackedqueue.Queue.deserialize(Queue.java:632)
... 50 more
Caused by: java.lang.IllegalArgumentException: Missing Valuefier handling for full class name=[B, simple name=byte[]
at org.logstash.Valuefier.convertNonCollection(Valuefier.java:51)
at org.logstash.Valuefier.convert(Valuefier.java:90)
at org.logstash.ConvertedMap.newFromMap(ConvertedMap.java:31)
at org.logstash.Valuefier.convert(Valuefier.java:69)
at org.logstash.Event.<init>(Event.java:66)
at org.logstash.Event.fromSerializableMap(Event.java:212)
at org.logstash.Event.deserialize(Event.java:399)
... 54 more
Caused by: java.lang.IllegalArgumentException: No enum constant org.logstash.bivalues.BiValues.[B
at java.lang.Enum.valueOf(Enum.java:238)
at org.logstash.bivalues.BiValues.valueOf(BiValues.java:20)
at org.logstash.bivalues.BiValues.newBiValue(BiValues.java:88)
at org.logstash.Valuefier.convertNonCollection(Valuefier.java:48)
... 60 more

Logstash.yml:

node.name: logstash01

path.data: /var/lib/logstash

pipeline.workers: 144
pipeline.output.workers: 1
pipeline.batch.size: 12
pipeline.batch.delay: 5
pipeline.unsafe_shutdown: false

path.config: /etc/logstash/conf.d
config.test_and_exit: false
config.reload.automatic: true
config.reload.interval: 10
config.debug: false

queue.type: persisted
path.queue: /var/lib/logstash/queue
queue.page_capacity: 250mb
queue.max_events: 0
queue.max_bytes: 2046mb
queue.checkpoint.acks: 1024
queue.checkpoint.writes: 1024
queue.checkpoint.interval: 1000

http.host: 127.0.0.1
http.port: 9600

log.format: plain
log.level: info
path.logs: /var/log/logstash

Apologies that we missed this question.

How are you getting you beats data into Kafka? Direct or via LS.
1)

Beats -> LS -> Kafka -> LS(PQ) -> ES

or 2)

Beats -> Kafka -> LS(PQ) -> ES

Also, which beats are you using?

See this comment for an explanation of the problem

Hey @guyboertje, sorry for the late reply. I use to do beats > logstash > ES with persistent queueing. Now I am doing beats > kafka > logstash > ES which is when persistent queueing seemed to cause the problems described in the original post. Going back to memory queueing is the current work around although not preferred as a long term solution.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.