Logstash Worker Threads Blocked by Pipeline

Hi,

Im using the 7.4.2 Version of the elastic Stack.

One of my Logstash Pipelines Keeps blocking the Worker Threads after a While.
Essentialy grinding all of the ingestion to a halt. As no other Pipelines can be executed anymore.

The pipeline isnt extremely complex. At least we have similar or more complex ones that work just fine with 100x more load on them.

This is the yml file for the pipeline. Tho it is usually split up into 4 parts. The input Application specific filter, generel filter we use for almost all of the pipelines and the output which is similar on every pipeline.

    input {
        pipeline { address => applicationtag }
    }

    filter {
        grok {
            match => { "message" => [
            "^%{TIMESTAMP_ISO8601:timestamp}; %{LOGLEVEL:loglevel}; %{NOTSPACE:thread}; ; %{NOTSPACE:java_class}; The File %{PATH:filepath} was sent correctly to %{NOTSPACE:uri}",
            "^%{TIMESTAMP_ISO8601:timestamp}; %{LOGLEVEL:loglevel}; %{NOTSPACE:thread}; ; %{NOTSPACE:java_class};.*"
            ]}
        }
          date {
           match => [ "timestamp", "dd.MM.yyyy HH:mm:ss,SSS", "ISO8601" ]
           target => "@timestamp"
          }
          mutate {
            remove_field =>["type","offset","input_type","timestamp"]
            uppercase => [ "level" ]
          }
    }

    filter {
        mutate {
            add_field => {
                "[@metadata][application]" => "%{[fields][application]}"
                "application" => "%{[fields][application]}"
            }
        }

        mutate { remove_field => ["[fields][application]" ] }

        if "_grokparsefailure" in [tags] {
            mutate { replace => { "[@metadata][application]" => "grokparsefailure-applicationtag"} }
        }
    }

    output {
        if [level] == "DEBUG" {
          elasticsearch {
            hosts => ["elasticsearchlbaddr.com"]
            manage_template => false
            index => "logs-%{[@metadata][application]}-debug-%{+YYYY-ww}"
          }
        } else {
          elasticsearch {
            hosts => ["elasticsearchlbaddr.com"]
            manage_template => false
            index => "logs-%{[@metadata][application]}-%{+YYYY-ww}"
          }
        }
    }

I have tried looking at the Trace Logs but the only thing i can see in those is the message
[2020-07-14T18:23:05,148][DEBUG][org.logstash.execution.PeriodicFlush] Pushing flush onto pipeline.
Spammed over and over again with no other input.
Here and there a different pipeline manages to get a few documents thru.
Also this message is seen a lot too
[2020-07-14T17:12:00,791][TRACE][org.logstash.beats.ConnectionHandler] d68d5780: sending keep alive ack to libbeat
This also seems to be the Reason why the loadbalancing inside Filebeat wont work.

I had the same issue with a different Pipeline, which could be solved by changing the Grok pattern.
I've tried this on this pipeline too, but it doesnt work this time.

The only other piece of information would be the output of the Hot thread api, that logstash has

In this case logstash had recently been restarted but if i were to let run longer it would eventually reach 99% cpu

Ive tried reducing the Workers to 1 For this pipeline but that didnt resolve the issue. Usually we run with 2 Workers.
The Node has 4cpu cores allocated, from what i understood that should allow 4 workers to work at least ?
So i dont understand how a single worker can then essentially halt the whole system.
The other pipelines are all in state: timed_waiting.

It's also worth noting this pipeline is not getting a lot of load at all. Maybe 7 documents a second.
While the others get a total of 5k/s.

    76.11 % of cpu usage, state: runnable, thread name: '[applicationtag-pipeline]>worker1', thread id: 84 
    	org.joni.ByteCodeMachine.executeSb(ByteCodeMachine.java:321)
    	org.joni.ByteCodeMachine.matchAt(ByteCodeMachine.java:167)
    	org.joni.Matcher.matchCheck(Matcher.java:287)
    	org.joni.Matcher.searchCommon(Matcher.java:431)
    	org.joni.Matcher.search(Matcher.java:301)
    	org.jruby.RubyRegexp.matcherSearch(RubyRegexp.java:231)
    	org.jruby.RubyRegexp.search(RubyRegexp.java:1306)
    	org.jruby.RubyRegexp.matchPos(RubyRegexp.java:1195)
    	org.jruby.RubyRegexp.matchCommon(RubyRegexp.java:1163)
    	org.jruby.RubyRegexp.match_m(RubyRegexp.java:1129)
    	java.lang.invoke.LambdaForm$DMH/2115368148.invokeVirtual_L4_L(LambdaForm$DMH)
    	java.lang.invoke.LambdaForm$BMH/549982920.reinvoke(LambdaForm$BMH)
    	java.lang.invoke.LambdaForm$MH/1384010761.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/529893402.guard(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/1384010761.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/529893402.guard(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/753705711.linkToCallSite(LambdaForm$MH)
    	usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.jls_minus_grok_minus_0_dot_11_dot_5.lib.grok_minus_pure.RUBY$method$execute$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/jls-grok-0.11.5/lib/grok-pure.rb:182)
    	java.lang.invoke.LambdaForm$DMH/1262773598.invokeStatic_L7_L(LambdaForm$DMH)
    	java.lang.invoke.LambdaForm$BMH/393594087.reinvoke(LambdaForm$BMH)
    	java.lang.invoke.LambdaForm$MH/1384010761.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/529893402.guard(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/1384010761.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/529893402.guard(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/753705711.linkToCallSite(LambdaForm$MH)
    	usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_grok_minus_4_dot_1_dot_1.lib.logstash.filters.grok.RUBY$block$grok_till_timeout$1(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-grok-4.1.1/lib/logstash/filters/grok.rb:353)
    	java.lang.invoke.LambdaForm$DMH/1582071873.invokeStatic_L6_L(LambdaForm$DMH)
    	java.lang.invoke.LambdaForm$BMH/951671255.reinvoke(LambdaForm$BMH)
    	java.lang.invoke.LambdaForm$MH/1482087396.invokeExact_MT(LambdaForm$MH)
    	org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:146)
    	org.jruby.runtime.BlockBody.yield(BlockBody.java:114)
    	org.jruby.runtime.Block.yield(Block.java:170)
    	org.jruby.ext.timeout.Timeout.yieldWithTimeout(Timeout.java:126)
    	org.jruby.ext.timeout.Timeout.timeout(Timeout.java:99)
    	org.jruby.ext.timeout.Timeout.timeout(Timeout.java:75)
    	org.jruby.ext.timeout.Timeout$INVOKER$s$timeout.call(Timeout$INVOKER$s$timeout.gen)
    	java.lang.invoke.LambdaForm$DMH/323981046.invokeVirtual_L8_L(LambdaForm$DMH)
    	java.lang.invoke.LambdaForm$BMH/1381341801.reinvoke(LambdaForm$BMH)
    	java.lang.invoke.LambdaForm$MH/1487424018.guardWithCatch(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/1904652802.identity_L(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/2124643775.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/26229597.guard(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/2124643775.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/26229597.guard(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/1254651534.linkToCallSite(LambdaForm$MH)
    	usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_grok_minus_4_dot_1_dot_1.lib.logstash.filters.grok.RUBY$method$grok_till_timeout$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-grok-4.1.1/lib/logstash/filters/grok.rb:353)
    	java.lang.invoke.LambdaForm$DMH/2025269734.invokeStatic_L9_L(LambdaForm$DMH)
    	java.lang.invoke.LambdaForm$BMH/829217329.reinvoke(LambdaForm$BMH)
    	java.lang.invoke.LambdaForm$MH/2124643775.delegate(LambdaForm$MH)
    	java.lang.invoke.LambdaForm$MH/26229597.guard(LambdaForm$MH)

If there is more info i can provide, I will try to fetch it.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.