Beats input: the pipeline is blocked

Hi,

I'm using logstash 2.2.0 and logstash-input-beats (2.1.2).

This error occured when y add a grok filter. My original filter look like this :

 > filter {
>         if "java_devices" in [source] {
>                  grok {
>                         match => ["message","%{DATA:tango_level} %{DATA:tango_date} %{DATA:tango_heur} %{DATA:device_name} %{DATA:drop} %{DATA:thread_name} \| %{DATA:method_name}:%{DATA:ligne_code:int} - (?m)%{GREEDYDATA:msg}"]
>                         add_tag => "groked"
>                         break_on_match => true
>                 }
>                 mutate {
>                         merge => { "[tags]" => "[fields][tags]" }
>                         remove_field => "[fields][tags]"
>                         remove_tag => ["beats_input_codec_plain_applied"]
>                 }
>         }
> }

This filter work very well. But when i add another grok filter in the match part like this :

> filter {
>         if "java_devices" in [source] {
>                  grok {
>                         match => ["message","(?m)%{DATA:tango_level} %{DATA:tango_date} %{DATA:tango_heur} %{DATA:device_name} %{DATA:drop} %{DATA:thread_name} \| %{DATA:method_name}:%{DATA:ligne_code:int} - exception message is: %{DATA:msg}\n%{DATA:error_level}:\n[[:space:]][[:space:]]- desc: %{DATA:desc}\n[[:space:]][[:space:]]- origin: %{DATA:origin}\n[[:space:]][[:space:]]- reason: %{DATA:reason}\n[[:space:]][[:space:]]- severity: %{DATA:severity}\n\n%{DATA:error_level}:\n[[:space:]][[:space:]]- desc: %{DATA:desc}\n[[:space:]][[:space:]]- origin: %{DATA:origin}\n[[:space:]][[:space:]]- reason: %{DATA:reason}\n[[:space:]][[:space:]]- severity: %{DATA:severity}$",
>                                   "message","%{DATA:tango_level} %{DATA:tango_date} %{DATA:tango_heur} %{DATA:device_name} %{DATA:drop} %{DATA:thread_name} \| %{DATA:method_name}:%{DATA:ligne_code:int} - (?m)%{GREEDYDATA:msg}"]
>                         add_tag => "groked"
>                         break_on_match => true
>                 }
>                 mutate {
>                         merge => { "[tags]" => "[fields][tags]" }
>                         remove_field => "[fields][tags]"
>                         remove_tag => ["beats_input_codec_plain_applied"]
>                 }
>         }
> }

Logstash beat input has an error :

> {:timestamp=>"2016-08-30T09:32:09.000000+0200", :message=>"Starting pipeline", :id=>"base", :pipeline_workers=>1, :batch_size=>125, :batch_delay=>5, :max_inflight=>125, :level=>:info}
> {:timestamp=>"2016-08-30T09:32:09.001000+0200", :message=>"Pipeline started", :level=>:info}
> {:timestamp=>"2016-08-30T09:33:06.014000+0200", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
> {:timestamp=>"2016-08-30T09:33:06.017000+0200", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
> {:timestamp=>"2016-08-30T09:33:06.285000+0200", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
> {:timestamp=>"2016-08-30T09:33:06.285000+0200", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
> {:timestamp=>"2016-08-30T09:33:09.007000+0200", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
> {:timestamp=>"2016-08-30T09:33:09.008000+0200", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
> {:timestamp=>"2016-08-30T09:33:39.012000+0200", :message=>"Beats Input: Remote connection closed", :peer=>"195.221.0.33:46256", :exception=>#<Lumberjack::Beats::Connection::ConnectionClosed: Lumberjack::Beats::Connection::ConnectionClosed wrapping: EOFError, End of file reached>, :level=>:warn}
> {:timestamp=>"2016-08-30T09:34:37.033000+0200", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn} 

Anyone has an idea ?

Your Logstash instance isn't processing messages fast enough, either because its own throughput is too low or because it's throttled by the slowness of its outputs.

Thanks for the answer, do you have any preconisations to solve this issue ?

I found my problem,

I had one filter that was misconfigured.

To found my problem, i proceeded by dichotomy. Firstable, I replaced all my outputs by a "null" output to see if my problem was cause by one of my outputs.
The problem appeared again.

So i remove all my filters, to see if my problem was cause by one of my filter.
The problem disappeared.

So i put all my filters one by one to know which one was misconfigured.
I finally found the bad filter, and i have change it.

I don't have any Logstash error anymore.

Thanks