Beats input: the pipeline is blocked, temporary refusing new connection


#1

I am using elasticsearch 1.7.1, logstash 1.5.4, and testing filebeat.
It runs perfectly if I use filebeat to scan log files and use filebeat to output log data directly to elasticsearch as follows:
logfiles -> filebeat -> elasticsearch

It also runs perfectly if I use filebeat to scan log files ouput to logstash, then use the logstash to output the data into another file as follows
logfiles -> filebeat ->logstash -> another file

But when I use the logstash to output to elasticsearch as such,
logfiles ->filebeat -> logstash -> elasticsearch
It reports warning, and elasticsearch never gets the data

This is the warning from logstash side:
## Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover. {:exception=>LogStash::CircuitBreaker::OpenBreaker, :level=>:warn}
## Beats input: the pipeline is blocked, temporary refusing new connection. {:level=>:warn}

This is the error from filebeat side:
...
** 2016/01/08 22:30:07.441115 single.go:121: INFO Connecting error publishing events (retrying): dial tcp 127.0.0.1:5044: getsockopt: connection refused**
2016/01/08 22:30:07.441146 single.go:143: INFO send fail
2016/01/08 22:30:07.441159 single.go:150: INFO backoff retry: 4s
...

Here is my conf file for logstash

Here is my yml file for filebeat

The elasticsearch log shows:
[2016-01-08 17:55:59,831][WARN ][http.netty ] [dev-elkstack] Caught exception while handling client http traffic, closing connection [id: 0xe359725c, /10.100.16.221:52237 => /10.100.16.175:9200]
java.lang.IllegalArgumentException: empty text
at org.elasticsearch.common.netty.handler.codec.http.HttpVersion.(HttpVersion.java:89)
at org.elasticsearch.common.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:62)
at org.elasticsearch.common.netty.handler.codec.http.HttpRequestDecoder.createMessage(HttpRequestDecoder.java:75)
at org.elasticsearch.common.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:191)
at org.elasticsearch.common.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:102)
at org.elasticsearch.common.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:500)
at org.elasticsearch.common.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


(Magnus Bäck) #2

Your elasticsearch output configuration is incorrect; you're trying to use the node or transport protocol against ES's HTTP port. Since the node and transport protocols are deprecated and removed from the elasticsearch output plugin in Logstash 2.0 I suggest you switch to the HTTP protocol with protocol => "http". Then the use of port 9200 will be correct.


(Mark Walkom) #3

Also just as a general comment, try not to post images of text files, it's much better if you can paste and format them so that others can test (if need be), but also some images may not show (for whatever reason).


#4

Thank you very much I used
host => "dev-elkstack:9200"
protocol => "http"
it works!


#5

Hi Mark, I'm seeing the same problem, but i'm running elasticsearch 2.3.3 and logstash 2.3.2, with below logstash output configuration. With this latest ELK, it's default to HTTP protocol, so the solution doesn't really apply. what could be the other reason? thank you.

elasticsearch {
hosts => ["server1", "server2","server3"]
index => "app-%{+YYYY.MM.dd}"
}

{:timestamp=>"2016-07-08T18:15:32.204000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep
=>0.5, :level=>:warn}
{:timestamp=>"2016-07-08T18:15:32.205000+0800", :message=>"CircuitBreaker::Open", :name=>"Beats input", :level=>:warn}
{:timestamp=>"2016-07-08T18:15:32.208000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is clo
sing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::OpenBreaker
, :level=>:warn}
{:timestamp=>"2016-07-08T18:15:32.707000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep
=>0.5, :level=>:warn}
{:timestamp=>"2016-07-08T18:15:33.210000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep
=>0.5, :level=>:warn}


(Steffen Siering) #6

please start another topic


(system) #7