I used input{filebeat} on logstash to receive events sent by filebeat, but I have found a problem. When I used multiple filebeat processes to send events to multiple logstash process, some logstash would issue following logs:
{:timestamp=>"2015-12-11T18:30:33.800000+0800", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
{:timestamp=>"2015-12-11T18:32:35.903000+0800", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::SizedQueueTimeout::TimeoutError, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:35.905000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:41.994000+0800", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::SizedQueueTimeout::TimeoutError, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:41.995000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:49.083000+0800", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::SizedQueueTimeout::TimeoutError, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:49.084000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:58.172000+0800", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::SizedQueueTimeout::TimeoutError, :level=>:warn}
{:timestamp=>"2015-12-11T18:32:58.173000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2015-12-11T18:33:03.304000+0800", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::SizedQueueTimeout::TimeoutError, :level=>:warn}
{:timestamp=>"2015-12-11T18:33:03.305000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2015-12-11T18:33:04.307000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :level=>:warn}
{:timestamp=>"2015-12-11T18:33:04.405000+0800", :message=>"CircuitBreaker::Open", :name=>"Beats input", :level=>:warn}
{:timestamp=>"2015-12-11T18:33:04.407000+0800", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::CircuitBreaker::OpenBreaker, :level=>:warn}
{:timestamp=>"2015-12-11T18:33:04.808000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :level=>:warn}
{:timestamp=>"2015-12-11T18:33:05.309000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :level=>:warn}
{:timestamp=>"2015-12-11T18:33:05.809000+0800", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :level=>:warn}
.......
Logstash version: 1.5.6
OS: CentOS 6.5
Logstash config file as following:
input {
beats {
port => 5044
add_field => {
"BRO_type" => "BRO_connlog"
}
}
}
filter {
if [message] =~ /^#/ {
drop { }
} else {
BRO_connlog
if [BRO_type] == "BRO_connlog" {
grok {
match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<src_ip>(.*?))\t(?<src_port>(.*?))\t(?<dst_ip>(.*?))\t(?<dst_port>(.*?))\t(?<protocol>(.*?))\t(?<service>(.*?))\t(?<duration>(.*?))\t(?<src_bytes>(.*?))\t(?<dst_bytes>(.*?))\t(?<conn_state>(.*?))\t(?<local_orig>(.*?))\t(?<missed_bytes>(.*?))\t(?<history>(.*?))\t(?<src_pkts>(.*?))\t(?<src_ip_bytes>(.*?))\t(?<dst_pkts>(.*?))\t(?<dst_ip_bytes>(.*?))\t(?<tunnel>(.*))"]
remove_field => ["beat.hostname", "beat.name", "input_type", "offset"]
}
}
}
}
output {
elasticsearch {
host => "localhost"
cluster => "elasticsearch-cluster"
node_name => "es-001"
flush_size => "20000"
}
}