Error publishing events (EOF, broken pipe, i/o timeout, connection reset)


#1

Filebeat can't publish the data to logstash if there is a lot of traffic (~ 100-150 lines per second). I read a lot about discussions about the "Error publishing events" but nothing really matches my case. In the logstash log I can't find anything about these errors.

Versions:
filbeat 1.3.1 --> logstash 2.4.0 --> elasticsearch 2.4.1
logstash-input-beats (3.1.3)

filebeat config

filebeat:
  prospectors:
    -
      paths:
        - /data/probe/output/probe_pdu*
      input_type: log
      document_type: probe
      fields_under_root: true
      ignore_older: 24h
  registry_file: /var/lib/filebeat/registry
output:
  logstash:
    hosts: ["10.11.141.29:5044", "10.11.141.30:5044"]
    worker: 1
    compression_level: 3
    loadbalance: true
shipper:
  name: zh01t-tcore01
  tags: []
logging:
  to_syslog: false
  to_files: true
  files:
    path: /var/log/filebeat
    rotateeverybytes: 10485760 # = 10MB
    keepfiles: 5
  level: info

/etc/sysconfig/logstash

LS_OPTS="${LS_OPTS} -b 500 -w 4"
KILL_ON_STOP_TIMEOUT=0

Errors:

2016-11-23T15:04:15Z INFO Start sending events to output
2016-11-23T15:04:18Z INFO Error publishing events (retrying): write tcp 10.11.141.8:60533->10.11.141.29:5045: write: broken pipe
2016-11-23T15:05:14Z INFO Error publishing events (retrying): EOF
2016-11-23T15:06:26Z INFO Error publishing events (retrying): read tcp 10.11.141.8:60648->10.11.141.29:5045: read: connection reset by peer

or

2016-11-23T15:00:19Z INFO Registry file updated. 1 states written.
2016-11-23T15:00:23Z INFO Error publishing events (retrying): read tcp 10.11.141.8:59954->10.11.141.29:5045: i/o timeout
2016-11-23T15:00:25Z INFO Error publishing events (retrying): EOF
2016-11-23T15:01:02Z INFO Error publishing events (retrying): EOF
2016-11-23T15:01:52Z INFO Error publishing events (retrying): write tcp 10.11.141.8:60137->10.11.141.29:5045: write: broken pipe
2016-11-23T15:02:07Z INFO Error publishing events (retrying): EOF
2016-11-23T15:02:24Z INFO Events sent: 2048

or

2016-11-06T09:20:12Z INFO Registry file updated. 37 states written.
2016-11-06T09:20:39Z INFO Error publishing events (retrying): EOF
2016-11-06T09:20:39Z INFO Events sent: 1

#2

logstash config (within multiple files)

input {
  beats {
    port => 5044
    congestion_threshold => 40
  }
  beats {
    port => 5045
    congestion_threshold => 40
    codec => "json"
  }
}
filter {
  mutate {
    remove_field => ["[beat][hostname]", "count", "host", "input_type", "offset"]
    remove_tag => ["beats_input_codec_plain_applied", "beats_input_codec_json_applied"]
  }
}
filter {
  if [type] == "java" {
    if "log4j" in [tags] {
      grok {
        match => { "message" => ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:logLevel}%{SPACE}\[%{DATA:className}\] \(%{DATA}\) Succesful logged in with username %{DATA:username}$",
                                 "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:logLevel}%{SPACE}\[%{DATA:className}\]" ] }
        break_on_match => true
      }
      date {
        match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
      }
    }
  if [source] =~ "wrapper" {
	mutate {
	  add_tag => [ "wrapper" ]
	}
	grok {
	  match => { "message" => ["%{TIMESTAMP_ISO8601:timestamp}\|%{DATA:logLevel}\|%{GREEDYDATA:short_message}$" ] }
	  remove_field => ["message"]
	}
	date {
	  match => [ "timestamp", "YY-MM-dd HH:mm:ss.SSS"]
	}
  } else if [source] =~ "access" {
	mutate {
	  add_tag => [ "access" ]
	}
	#add parsing if needed
  } else {
	grok {
	  "patterns_dir" => "/etc/logstash/conf.d/patterns"
	  match => {
			"message" => ["(?m)\(%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\]\)%{NOT_EMPTY_SPACE}%{DATA:logLevel}%{NOT_EMPTY_SPACE}%{DATA:short_message}\(%{DATA:className}:%{DATA:lineNumber}\)%{OPTIONAL_COLON}%{GREEDYDATA:exception}" ]
	  }
	  remove_field => ["message"]
	}
	date {
	  match => [ "timestamp", "YYYY-MM-dd HH:mm:ss.SSS"]
	  timezone => "UTC"
	  remove_field => ["timestamp"]
	}
  }
    grok {
      match => {"source" => "(.*/)?%{GREEDYDATA:source}" }
      overwrite => ["source"]
    }
  }
}
output {
  if [type] == "java" {
    elasticsearch {
      hosts => ["10.11.141.29:9200", "10.11.141.30:9200"]
      index => "filebeat-service-%{+YYYY.MM.dd}"
      workers => 2
    }
    if [logLevel] == "ERROR" {
      nagios_nsca {
        host => "10.11.141.38"
        port => 5667
        nagios_host => "%{[beat][name]}"
        nagios_status => 2
        message_format => "%{@timestamp} %{service} %{logLevel}: %{short_message}"
        send_nsca_bin => "/sbin/send_nsca"
        workers => 4
      }
    }
  }
}

filter {
  if [type] == "cdr" {
    grok {
        "patterns_dir" => "/etc/logstash/conf.d/patterns"
        match => {
          "message" => [
            "%{UUID:guid},%{TIMESTAMP_ISO8601:timestamp},%{MSG_INCOMING:recordType},%{TIMESTAMP_ISO8601:receivedTime},%{NUMBER:customerTrunk},%{DATA:aNumber},%{NUMBER:sentTon},%{NUMBER:npi},%{DATA:bNumber},%{NUMBER:dlrMask},%{NUMBER:dcs},%{DATA:hash}$",
            "%{UUID:guid},%{TIMESTAMP_ISO8601:timestamp},%{MSG_FINALIZE:recordType},%{DATA:reason}$"
          ]
        }
        remove_field => [ "message" ]
    }
    date {
         match => [ "timestamp", "ISO8601"]
         remove_field => ["timestamp"]
    }
    grok {
      match => {"source" => "(.*/)?%{GREEDYDATA:source}" }
      overwrite => ["source"]
    }
  }
}
output {
  if [type] == "cdr" {
    elasticsearch {
      hosts => ["10.11.141.29:9200", "10.11.141.30:9200"]
      index => "filebeat-technical-%{+YYYY.MM.dd}"
      workers => 2
    }
  }
}
filter {
  if [type] == "kannel" {
    grok {
        match => {
          "message" => [
            "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA}\] \[%{DATA}\] %{DATA:logLevel}: %{GREEDYDATA:short_message}"
          ]
        }
        remove_field => [ "message" ]
    }
    date {
         match => [ "timestamp", "YYYY-MM-dd HH:mm:ss"]
         remove_field => ["timestamp"]
    }
  }
}
output {
  if [type] == "kannel" {
    elasticsearch {
      hosts => ["10.11.141.29:9200", "10.11.141.30:9200"]
      index => "filebeat-kannel-%{+YYYY.MM.dd}"
      workers => 2
    }
  }
}
filter {
  if [type] == "probe" {
    date {
         match => [ "reqTimestamp", "ISO8601"]
         remove_field => [ "source" ]
    }
  }
}
output {
  if [type] == "probe" {
    elasticsearch {
      hosts => ["10.11.141.29:9200", "10.11.141.30:9200"]
      index => "filebeat-tech-probe-%{+YYYY.MM.dd}"
      workers => 2
    }
  }
}

(Andrew Kroh) #3

Those Filebeat log messages most likely indicate you are hitting the congestion threshold in Logstash and Logstash is dropping the connection.

You could do some benchmarking to find the bottleneck in your system. Test your LS pipeline with stdin/stdout and measure the throughput (stdout { codec => dots } works nice if you pipe the output through pv -War > /dev/null. It will log the events per second rate).

BTW that logstash-beats-input you have defined on port 5045 looks incorrect because you have it setup with the json codec. But that should be unrelated to this problem.


#4

It may is relating to the same problem. If I stop or restart logstash it's not able to shutdown safely.

The log looks like this:


#5

It is not correct? The log file, which is forwarded by filebeat, is in json format. It works fine in the test.


#6

without an other output:
avg [ 350 B/s] max was [ 500 B/s]

So there should be no problem?
I have 2 logstash instances/server so it should double the througput

with all outputs:
avg [37.7 B/s] max was [ 150 B/s]


(Andrew Kroh) #7

Those throughput numbers (with outputs enabled) look too low to sustain a 100-150 lines per second rate. Have you run through the Performance Troubleshooting Guide? When running the throughput test it would be good to check if your CPU is saturated. If not possibly increase the number of workers (as mentioned in that guide).

It may be beneficial for you to should post in the Logstash topic with your LS config and the test setup and results you got. There you may get a bit more visibility on the topic of tuning from the Logstash experts.


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.