Exception in pipelineworker, the pipeline stopped processing new events

Hello,
I have 50 pipes, and I get this error:

[2018-09-19T10:30:12,824][ERROR][logstash.pipeline ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"pipe1", "exception"=>"undefined method multi_filter' for nil:NilClass", "backtrace"=>["(eval):540:in block in initialize'", "org/jruby/RubyArray.java:1734:in each'", "(eval):534:in block in initialize'", "(eval):95:in block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:340:in filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:319:in worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:285:in block in start_workers'"], :thread=>"#<Thread:0x17ca5935@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:42 sleep>"}

Every time on a different pipeline.
Each pipeline works just fine when running independently.

What is even more strange:

  • I can ingest like 50,000 lines if I am using error level of logging in logstash.yml:
    log.level: error
    config.debug: true

  • I can ingest 500,000 lines if I am using debug level of logging in logstash.yml.

What is the difference and impact on the pipeline workflow when it comes to the log.level: error?
(I am processing the same data, getting different results).

What is happening here? Could it be that Logstash saying that he is unable to process more data within pipeline?

EDIT: This issue was resolved in Logstash 6.5.0 for us through https://github.com/elastic/logstash/pull/10113


I'm facing the same issue on one of our Logstash clusters. We have multiple clusters, but this one specifically faces the issue. Each cluster has the same number of pipelines, so I'm definitely thinking it's caused by something in our config. Additionally each cluster is version 6.4.2.

It usually always happens when I restart logstash (but on a different pipeline each time). After a while of crash looping it will stop getting errors and run successfully until next time I restart Logstash.

I haven't measured performance differences in debug vs error logging.

The errors:

[2018-10-15T16:25:17,910][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"kibana_logs", "exception"=>"undefined method `multi_filter' for nil:NilClass", "backtrace"=>["(eval):281:in `block in initialize'", "org/jruby/RubyArray.java:1734:in `each'", "(eval):278:in `block in initialize'", "(eval):58:in `block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:341:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:320:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:286:in `block in start_workers'"], :thread=>"#<Thread:0x5fbafb20@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:46 sleep>"}
[2018-10-15T15:58:00,114][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"kibana_logs", "exception"=>"undefined method `call' for nil:NilClass\nDid you mean?  caller", "backtrace"=>["(eval):58:in `block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:341:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:320:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:286:in `block in start_workers'"], :thread=>"#<Thread:0x5d153069@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:157 sleep>"}

We're using a 3rd party plugin, and I really badly want to blame it for the errors, but the stack trace says nothing to suggest it's happening outside logstash-core

A sanitized version of a pipeline config where the error happened:

input {
  kafka {
    bootstrap_servers => "host1.local:9093,host2.local:9093,host3.local:9093"
    topics => "a_prefix"
    codec => json
    security_protocol => "SSL"
    ssl_truststore_location => "/usr/share/logstash/config/ssl/truststore.jks"
    ssl_truststore_password => "a password"
    ssl_truststore_type => "JKS"
    ssl_keystore_location => "/usr/share/logstash/config/ssl/keystore.jks"
    ssl_keystore_password => "a password"
    ssl_keystore_type => "JKS"
    client_id => "host1.local.a_prefix"
  }
  kafka {
    bootstrap_servers => "host1.local:9093,host2.local:9093,host3.local:9093"
    topics => "input_metrics"
    codec => json
    security_protocol => "SSL"
    ssl_truststore_location => "/usr/share/logstash/config/ssl/truststore.jks"
    ssl_truststore_password => "a password"
    ssl_truststore_type => "JKS"
    ssl_keystore_location => "/usr/share/logstash/config/ssl/truststore.jks"
    ssl_keystore_password => "a password"
    ssl_keystore_type => "JKS"
    client_id => "host1.local.a_prefix.metrics"
  }
}

filter {
  if "input_metrics" not in [tags] {
    metrics {
      timer => [ "events", "%{request_time}" ]
      rates => [ 1, 5, 15 ]
      percentiles => []
      add_tag => [ "output_metrics", "a_prefix" ]
    }
  }
}

output {
  if "input_metrics" in [tags] {
    elasticsearch {
      hosts => ["host1.local:9200", "host2.local:9200", "host3.local:9200", "host4.local:9200", "host5.local:9200"]
      ssl => true
      cacert => "/usr/share/logstash/config/ssl/ca.crt"
      index => "input_metrics-%{+YYYY.MM.dd}"
      user => "logstash_internal"
      password => "a password"
    }
  } else if "output_metrics" in [tags] {
    elasticsearch {
      hosts => ["host1.local:9200", "host2.local:9200", "host3.local:9200", "host4.local:9200", "host5:9200"]
      ssl => true
      cacert => "/usr/share/logstash/config/ssl/ca.crt"
      index => "metrics-%{+YYYY.MM.dd}"
      user => "logstash_internal"
      password => "a password"
    }
  } else {
    elasticsearch {
      hosts => ["host1.local:9200", "host2.local:9200", "host3.local:9200", "host4.local:9200", "host5.local:9200"]
      ssl => true
      cacert => "/usr/share/logstash/config/ssl/ca.crt"
      index => "a_prefix-%{+YYYY.MM.dd}"
      user => "logstash_internal"
      password => "a password"
    }

    azure {
      storage_account_name => "storage account name"
      storage_access_key => "a key"
      container_name => "a_prefix"
      size_file => 52428800
      time_file => 15
      restore => true
      temporary_directory => "/var/tmp/logstash-output-azure/a_prefix"
      prefix => "a_prefix"
      upload_queue_size => 2
      upload_workers_count => 1
      rotation_strategy_val => "size_and_time"
      tags => []
      encoding => "gzip"
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.