Logstash-6.5.2 filter worker crash with Exception in pipelineworker


#1

Hello

We have a problem with our Logstash installation and we think it is a bug.

Logstash version: 6.5.2
OS: Centos7

Description

One of the pipelines in our Logstash installation stops working with an "Exception in pipelineworker" error. The pipeline continues accepting events via the defined input but stops processing these events when they have been saved in a persisted queue. The persisted queue for the pipeline will keep growing with new events not processed by filters/output sections.

Cause

We have identified the cause of the problem. The filter worker will crash if one use a geoip filter and the geoip source value is an empty json object. This is triggered because sometimes the clients generate a log with an empty value for the attributte used by geoip/source.

Full errorlog

[2018-12-07T12:54:16,727][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"test", "exception"=>"Index: 0, Size: 0", "backtrace"=>["java.util.ArrayList.rangeCheck(java/util/ArrayList.java:657)", "java.util.ArrayList.get(java/util/ArrayList.java:433)", "org.logstash.filters.GeoIPFilter.handleEvent(org/logstash/filters/GeoIPFilter.java:120)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.filter(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb:111)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.RUBY$method$filter$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_3_dot_0/gems/logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java/lib/logstash/filters//usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.do_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:143)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash/filters//usr/share/logstash/logstash-core/lib/logstash/filters/base.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.block in multi_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:162)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1734)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.multi_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash/filters//usr/share/logstash/logstash-core/lib/logstash/filters/base.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.filter_delegator.multi_filter(/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:44)", "RUBY.block in filter_func((eval):68)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.filter_batch(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:341)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.worker_loop(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:320)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash//usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.block in start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:286)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)", "java.lang.Thread.run(java/lang/Thread.java:748)"], :thread=>"#<Thread:0x4390727b sleep>"}

Pipeline configuration

cat /etc/logstash/logstash.yml
#
# logstash.yml
#
path.data: /var/lib/logstash
pipeline.batch.size: 1024
config.reload.automatic: true
config.reload.interval: 60s
queue.type: persisted
queue.drain: true
queue.max_bytes: 4gb
path.logs: /var/log/logstash

cat /etc/logstash/pipelines.yml
- pipeline.id: test
  path.config: "/etc/logstash/conf.d/test.conf"
  pipeline.workers: 2

cat /etc/logstash/conf.d/test.conf
input {
 beats {
   port => 5044
  add_field => { "pipeline_receiver" => "beats-receiver-5044"}
 }
}

filter{
  json {
    source => "message"
  }
 geoip { 
    source => "[MachineInfo][ipv4_address]"
 }
}

output{

 file {
   path => "/tmp/test_pipeline.txt"
 }
}

Sample Data

{"MachineInfo":{"ipv4_address":["8.8.8.8"],"json_test":"json test"},"comment":"Testing-1"}
{"MachineInfo":{"json_test":"json test"},"comment":"Testing-2"}
{"MachineInfo":{"ipv4_address":[],"json_test":"json test"},"comment":"Testing-3"}

Steps to Reproduce

  • Use filebeat to send the sample data to a logstash instance
  • Update the logfile defined in the filebeat input with the sample data lines one at the time
  • First and second log will be processed and saved into a file
  • The third log will crash the filter worker with an "Exception in pipelineworker" error
  • The pipeline will continue accepting logs and save them to a persisted queue but they will not be processed after this.