Logstash 5.4 - Repeated deserialization errors and losing data with Persistent Queues

Hi, We are getting repeated deserialization errors with Persistent Queues.

Environment:

  • Logstash 5.4.1 (unmodified CentOS image from elastic repos)
  • 200GB SSD mounted disk for the queue
  • Persistent queues enabled

Logstash configuration:

pipeline.workers: 2
queue.type: persisted
queue.max_bytes: 150gb

We set no other queue-related config (except stuff like http bind address), and we leave everything else to logstash defaults. The error starts to happen when Logstash is running. Please note, we run it in docker containers, so no runtime config reloads.

And once the error starts, well, that's it. It persists even after restarting Logstash. Pipeline is blocked, all subsequent events start buffering, but removing any single checkpoint/page file will also not work (will lead to the same seqnum mismatch error as in Must delete files in message queue to start logstash). We have to drop all the buffered logs.

Its exactly the same root error cause all the time:

Caused by: java.lang.IllegalArgumentException: No enum constant org.logstash.bivalues.BiValues.[B

We're now restarting Logstash on a daily basis, and we're losing logs with the PQ. We also have some backups of the page/checkpoint files, we can provide it if required. Can you please help?

Exception in thread "[main]>worker1" org.logstash.ackedqueue.QueueRuntimeException: deserialize invocation error
	at org.logstash.ackedqueue.Queue.deserialize(Queue.java:630)
	at org.logstash.ackedqueue.Page.lambda$readBatch$0(Page.java:57)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.logstash.ackedqueue.Page.readBatch(Page.java:57)
	at org.logstash.ackedqueue.Queue._readPageBatch(Queue.java:497)
	at org.logstash.ackedqueue.Queue.readBatch(Queue.java:488)
	at org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary$RubyAckedQueue.ruby_read_batch(JrubyAckedQueueExtLibrary.java:165)
	at org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary$RubyAckedQueue$INVOKER$i$2$0$ruby_read_batch.call(JrubyAckedQueueExtLibrary$RubyAckedQueue$INVOKER$i$2$0$ruby_read_batch.gen)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
	at rubyjit.LogStash::Util::WrappedAckedQueue$$read_batch_d49228611d045c1563109c40e125e11f223ebb3d1442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:86)
	at rubyjit.LogStash::Util::WrappedAckedQueue$$read_batch_d49228611d045c1563109c40e125e11f223ebb3d1442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
	at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:221)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
	at rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$read_next_35f44d806c5bc13a311ff64530bf70b917b790a01442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:257)
	at rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$read_next_35f44d806c5bc13a311ff64530bf70b917b790a01442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
	at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:141)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134)
	at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531442407170.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:176)
	at rubyjit$LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531442407170$block_0$RUBY$__file__.call(rubyjit$LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531442407170$block_0$RUBY$__file__)
	at org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)
	at org.jruby.runtime.Block.yield(Block.java:142)
	at org.jruby.ext.thread.Mutex.synchronize(Mutex.java:149)
	at org.jruby.ext.thread.Mutex$INVOKER$i$0$0$synchronize.call(Mutex$INVOKER$i$0$0$synchronize.gen)
	at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)
	at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)
	at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:176)
	at rubyjit.LogStash::Util::WrappedAckedQueue::ReadClient$$read_batch_bc834a631fd628792504b0d95b265bac58efdc531442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)
	at org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:141)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134)
	at org.jruby.ast.CallNoArgNode.interpret(CallNoArgNode.java:60)
	at org.jruby.ast.LocalAsgnNode.interpret(LocalAsgnNode.java:123)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.ast.WhileNode.interpret(WhileNode.java:131)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)
	at org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)
	at org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)
	at org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
	at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
	at org.jruby.runtime.Block.call(Block.java:101)
	at org.jruby.RubyProc.call(RubyProc.java:300)
	at org.jruby.RubyProc.call(RubyProc.java:230)
	at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.logstash.ackedqueue.Queue.deserialize(Queue.java:628)
	... 57 more

And the root cause is always the same,

Caused by: java.lang.IllegalArgumentException: Missing Valuefier handling for full class name=[B, simple name=byte[]
	at org.logstash.Valuefier.convertNonCollection(Valuefier.java:51)
	at org.logstash.Valuefier.convert(Valuefier.java:90)
	at org.logstash.ConvertedMap.newFromMap(ConvertedMap.java:31)
	at org.logstash.Valuefier.convert(Valuefier.java:69)
	at org.logstash.Event.<init>(Event.java:66)
	at org.logstash.Event.fromSerializableMap(Event.java:212)
	at org.logstash.Event.deserialize(Event.java:399)
	... 61 more
Caused by: java.lang.IllegalArgumentException: No enum constant org.logstash.bivalues.BiValues.[B
	at java.lang.Enum.valueOf(Enum.java:238)
	at org.logstash.bivalues.BiValues.valueOf(BiValues.java:20)
	at org.logstash.bivalues.BiValues.newBiValue(BiValues.java:88)
	at org.logstash.Valuefier.convertNonCollection(Valuefier.java:48)
	... 67 more

The error stack trace is very similar to: Logstash Crash with Persistent Queue and Kafka Input except that issue uses Kafka, we use PQs.

It seems like a Java Byte Array has somehow made it into an Event before it is put in the Persistent Queue.

Clearly this is able to be serialized via CBOR into binary form (which is then persisted). However, when it is taken out the queue, the binary deserialize to Map works because the error backtrace says:

	at org.logstash.Event.<init>(Event.java:66)
	at org.logstash.Event.fromSerializableMap(Event.java:212)
	at org.logstash.Event.deserialize(Event.java:399)

At this line Event.java:66, a new Event is created from the Map, but we don't have a BiValue handler for Byte arrays at the moment.

There are two ways to fix this:

  1. Detect and eliminate the source of the Byte array. For this we need your config and some lines of scrubbed data.
  2. Add a Byte array Bivalue to the Java Event. For this we need to change some LS core code and release a patch.

You might be able to achieve (1) without new LS core code (maybe an input plugin will need a fix). But for (2) you will have to wait for us to fix the Event code.

@guyboertje thanks so much for the fast response! Will take some of the corrupted files, scrub and attach it along with configuration. So we don't have data as they were never indexed, and we log data at scale from 50+ applications so its hard to find out which logs really triggered the bug. So we don't have the original logs, we only have the corrupt queue. I can PM you some of the corrupt queue files though. That's why its hard dealing with this bug. Will post the configuration though.

We don't use that many input plugins, and its hard for us to enable/disable the plugins as our downstreams depend on them. If there anything I can add to ES GitHub repo to track adding byte array BiValue to Event? :slight_smile:

And here is our scrubbed logstash config:

input {
    beats {
        port => ...
    }

    tcp {
        codec => "json"
        type => ...
        port => ...
    }
    udp {
        codec => "json"
        type => ...
        port => ...
    }
    udp {
        type => ...
        port => ...
    }
    udp {
        type => ...
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    tcp {
        type => ...
        mode => "server"
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    syslog {
        type => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }

    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    beats {
        port => ...
        add_field => ...
    }
    beats {
        port => ...
        add_field => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    beats {
        port => ...
        add_field => ...
    }
    beats {
        port => ...
        add_field => ...
    }
    http {
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    http {
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
    tcp {
        codec => "json_lines"
        type => ...
        port => ...
    }
}

filter {
  if [type] == "..." {
    mutate {
      remove_field => ...
    }
    mutate {
      rename => ...
    }
  }
}

filter {
if [type] in [ ... ]  {
    grok {
      match => ...
    }
    json {
      source => ...
    }
    mutate {
      remove_field => ...
      remove_tag => ...
    }
    useragent {
      source => ...
      prefix => ...
    }
    geoip {
      ...
    }
    mutate {
      convert => ...
    }
    mutate {
      convert => ...
    }
    mutate {
      convert => ...
    }
}

  if [type] == "..." {
    grok {
      match => ...
      overwrite => ...
    }

    date {
      match => ...
      remove_field => ...
    }

    date {
      match => ...
      remove_field => ...
    }
  }
  if [type] == "..." {
    mutate {
        convert => ...
        rename => ...
        gsub => ...
    }
  }

  if [type] in [ ... ] {
    mutate {
        convert => ...
        rename => ...
        gsub => ...
    }
  }

  if [type] in [ ... ]  {
      geoip {
        ...
      }
      mutate {
        convert => ...
      }
  }

  if [type] in [ ... ] {
    mutate {
      add_field => ...
    }
  }

   if [logger_name] {
    mutate {
      add_field => ...
    }
    mutate {
      gsub => ...
    }
   }

  if [type] == "..." {
    mutate {
      rename => ...
    }
    date {
      match => ...
      remove_field => ...
    }
    grok {
        match => ...
        remove_field => ...
    }
    grok {
        match => ...
    }
  }

}

output {
    elasticsearch {
        index => "..."
        codec => "json"
        hosts => [ ... ]
     }
}

and logstash.yml (already referenced in the issue above, pasting relevant parts here)

pipeline.workers: 2
queue.type: persisted
path.queue: /queue/
queue.max_bytes: 150gb
http.host: ...
path.config: ...
path.logs: ...

With 46 inputs in your config, this problem is almost impossible to troubleshoot without a second machine to divide and conquer.

Move half the inputs plus the filters and output to the other machine. 23 each. If the fault occurs on both then split by technology tcp/udp on one and rest on the other. Do this until the fault stays on one machine. Keep moving inputs one by one from the fault machine to the good machine until the fault moves over. Move the last one back and move the next one - repeat until only the inputs that are creating the fault remain. Post that faulty config here.

1 Like

Looks like it was reproduced in https://github.com/elastic/logstash/issues/6756#issuecomment-313502976

And a fix is coming in https://github.com/elastic/logstash/pull/7604

Hopefully that should resolve this issue. @guyboertje thanks for your support so far

@subhasdan unfortunately this is not the same issue as https://github.com/elastic/logstash/pull/7604 which is related to the missing @timestamp field when generating a String representation of the Event.

I am trying to understand in what condition a byte[] can be set in an Event field. We need to be able to reproduce this. Need to investigate.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.

Quick followup - in retrospective this is most probably caused by the Bignum/BigInteger issue as discussed in https://github.com/elastic/logstash/issues/8379