Filebeat to Logstash - "client is not connected"

filebeat , logstash , elasticsearch : all latest versiions

Hi ,

I have very strange problem , filebeat sends data to logstash , but after sometime it stops , errors in the log:

Failed to publish events caused by: client is not connected
2018-01-24T21:59:42Z ERR  Failed to publish events: client is not connected
2018-01-24T21:59:42Z INFO retryer: send unwait-signal to consumer
2018-01-24T21:59:42Z INFO   done

logstash and elasticsearch running on the same machine with 32g ram , each have been configured with 8g heap , no errors in logstash config.

I searched this error many times and applied all the changes but no luck so for.

i have set the client timeout to 240000

Please share the configurations you are using for Filebeat and Logstash.

What version?
What OS?

rhel 7

filebeat latest version

i just made the bulk size to 200 from 1024. now its working and the issue is gone for now. what can i do to keep the bulk size bigger and avoid the issue?

Can you please share the filebeat.yml configuration file? Also, how are you starting it?

What version number?

Please share your configurations to help remove ambiguity.

To which configuration setting are you referring? The default output.logstash.bulk_max_size is 2048 according to the docs. Or are you talking about the ES output in Logstash?

With the client inactivity timeout increased LS shouldn't be disconnecting the clients. Back-pressure from ES shouldn't cause disconnection either.

If you run logstash with trace logging enabled this might help figure out why the client is disconnecting (--log.level trace).

@ijazadm When you say its "stops", do you mean it never recover from that state and no more events are sent to ES?

yes output.logstash.bulk_max_size , when i set it to 1024 , the problem occurs with that error on the client filebeat log , no error in logstash , the port is still listening

yes , it never recovers from that error and no events are sent to ELK cluster

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
# You can find the full configuration reference here:

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

#=========================== Filebeat prospectors =============================


# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- type: log

  # Change to true to enable this prospector configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
    - /usr/local/zeus/log/*.log
    #- c:\programdata\elasticsearch\logs\*

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  exclude_lines: ['.*HTTP/unknown.*']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ['^ERR', '^WARN']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  exclude_files: ['errors.log']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #  level: debug
  #  review: 1

  ### Multiline options

  # Mutiline can be used for log messages spanning multiple lines. This is common
  # for Java Stack Traces or C-Line Continuation

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
  #multiline.pattern: ^\[

  # Defines if the pattern set under pattern should be negated or not. Default is false.
  #multiline.negate: false

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
  # that was (not) matched before or after or as long as a pattern is not matched based on negate.
  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
  #multiline.match: after

#============================= Filebeat modules ===============================

  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  #Reload.enabled: true

  # Period on which files under path should be checked for changes
  reload.period: 5s

#==================== Elasticsearch template setting ==========================

  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#  env: staging

#============================== Dashboards =====================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here, or by using the `-setup` CLI flag or the `setup` command.
#setup.dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the
# website.

#============================== Kibana =====================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  #host: "localhost:5601"

#============================= Elastic Cloud ==================================

# These settings simplify using filebeat with the Elastic Cloud (

# The setting overwrites the `output.elasticsearch.hosts` and
# `` options.
# You can find the `` in the Elastic Cloud web UI.

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.

#================================ Outputs =====================================

# Configure what output to use when sending the data collected by the beat.

#-------------------------- Elasticsearch output ------------------------------
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
  # The Logstash hosts
  hosts: [""]
  bulk_max_size: 200

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"
cat /usr/share/logstash/config/logstash.yml "localhost"
path.config: /usr/share/logstash/pipeline

from ELK monitoring , the cluster is under utilized , but in that pipeline i see the ip2location filter to be around 300% , that can be the cuase?

i have also tried to fix it by configuring queuing and buffering for logstash , and on the filebeat side i have tried pipeline and async settings as well , but no luck

one other thing is , there are 3 ELK nodes in the cluster with same configuration , and i have three clients with same configuration sending data to each ELK node,

one to one mapping.

but i have also tried to configure each client with all three nodes as sinks , so that if one fails other will be tried , and i have also tried to run filebeat in loadbalance mode over three ELK nodes , but that was worse.

should i go to low level debugging , using tcpdump and strace etc?

I would really like to see some Logstash logs when this situation happen. Same as @andrewkroh requested Filebeat to Logstash - "client is not connected"

Concerning ip2location, I don't know this plugin.

@ijazadm Just to clarify, I think there is an error happening but this error is not show in the normal log level. :frowning:

ok , i just set the bulk size to deafult 2048 and the error just occured after 5 minuts

filebeat log: registrar.states.current=22
2018-01-26T16:19:49Z ERR  Failed to publish events caused by: read tcp> i/o timeout
2018-01-26T16:19:49Z ERR  Failed to publish events caused by: read tcp> i/o timeout
2018-01-26T16:19:49Z ERR  Failed to publish events caused by: client is not connected
2018-01-26T16:19:50Z ERR  Failed to publish events: client is not connected

logstash log with tracce level:

[2018-01-26T16:19:18,565][DEBUG][logstash.pipeline        ] filter received {"event"=>{"offset"=>336916228, "prospector"=>{"type"=>"log"}, "source"=>"/usr/local/zeus/log/wwwint.log", "@version"=>"1", "message"=>" - - [26/Jan/2018:07:38:30 +0000] \"GET /solr/citations/query?qt=%2Fquery&cursorMark=*&rows=25&sort=score+desc%2C+id+desc&q=pmc34995&DEBUG_QUERY=false&fl=id%2CEXT_ID%2CSRC%2CPMID%2CPMCID%2CTITLE_DISPLAY%2CDOI%2CJOURNAL_DISPLAY%2CAUTH_LIST%2CISSUE%2CVOLUME%2CPUB_YEAR%2CISSN%2CPAGE_INFO%2CPUB_TYPE%2COPEN_ACCESS%2CIN_EPMC%2CIN_PMC%2CHAS_PDF%2CHAS_BOOK%2CHAS_SUPPL%2CCITED%2CHAS_REFLIST%2CHAS_TM%2CHAS_XREFS%2CHAS_LABSLINKS%2CACCESSION_TYPE%2CBOOK_ID%2CHAS_EMBL%2CHAS_OMIM%2CHAS_UNIPROT%2CHAS_ARXPR%2CHAS_CRD%2CHAS_FULLTEXT%2CHAS_ABSTRACT%2CHAS_INTERPRO%2CHAS_INTACT%2CHAS_CHEMBL%2CHAS_PDB%2CFIRST_PDATE%2C&wt=javabin&version=2 HTTP/1.1\" 200 754 \"-\" \"Solr[org.apache.solr.client.solrj.impl.HttpSolrClient] 1.0\" ves-pg-37:8983 0.009835", "host"=>"", "beat"=>{"hostname"=>"", "name"=>"", "version"=>"6.1.2"}, "tags"=>["beats_input_codec_plain_applied"], "@timestamp"=>2018-01-26T16:17:53.278Z}}
[2018-01-26T16:19:42,396][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x7c0212cd sleep>"}
[2018-01-26T16:19:42,399][DEBUG][logstash.instrument.periodicpoller.cgroup] Error, cannot retrieve cgroups information {:exception=>"Errno::ENOENT", :message=>"No such file or directory - /sys/fs/cgroup/cpuacct/system.slice/docker-6993298369b33108f6a4da36600cc209a56e65a3fbf59c24690fc97e11684e2a.scope/cpuacct.usage"}
[2018-01-26T16:19:42,407][DEBUG][logstash.licensechecker.licensemanager] updating observers of xpack info change
[2018-01-26T16:19:42,407][DEBUG][logstash.inputs.metrics  ] updating licensing state installed:true,
          license:{"status"=>"active", "uid"=>"63a2b316-a5ae-47c0-b966-03cb02decea5", "type"=>"basic", "issue_date"=>"2018-01-18T00:00:00.000Z", "issue_date_in_millis"=>1516233600000, "expiry_date"=>"2019-01-18T23:59:59.999Z", "expiry_date_in_millis"=>1547855999999, "max_nodes"=>100, "issued_to"=>"ijaz ahmad (EMBL-EBI)", "issuer"=>"Web Form", "start_date_in_millis"=>1516233600000},
[2018-01-26T16:19:50,748][DEBUG][logstash.inputs.metrics  ] Metrics input: received a new snapshot {:created_at=>2018-01-26 16:19:50 UTC, :snapshot=>#<LogStash::Instrument::Snapshot:0x37262918 @metric_store=#<LogStash::Instrument::MetricStore:0x39ebd59c @store=#<Concurrent::Map:0x00000000000fbc entries=3 default_proc=nil>, @structured_lookup_mutex=#<Mutex:0x16b5d67f>, @fast_lookup=#<Concurrent::Map:0x00000000000fc0 entries=94 default_proc=nil>>, @created_at=2018-01-26 16:19:50 UTC>}
[2018-01-26T16:19:50,749][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x7c0212cd sleep>"}
[2018-01-26T16:19:50,752][DEBUG][logstash.instrument.periodicpoller.cgroup] Error, cannot retrieve cgroups information {:exception=>"Errno::ENOENT", :message=>"No such file or directory - /sys/fs/cgroup/cpuacct/system.slice/docker-6993298369b33108f6a4da36600cc209a56e65a3fbf59c24690fc97e11684e2a.scope/cpuacct.usage"}
[2018-01-26T16:19:50,841][DEBUG][logstash.pipeline        ] filter received {"event"=>{"pipelines"=>[{"events"=>{"in"=>8848, "out"=>5845, "queue_push_duration_in_millis"=>161928, "filtered"=>5845, "duration_in_millis"=>871128}, "ephemeral_id"=>"e49242ea-88f6-47ea-889e-b4ff16458160", "queue"=>{"type"=>"memory", "events_count"=>0}, "reloads"=>{"failures"=>0, "successes"=>0}, "vertices"=>[{"pipeline_ephemeral_id"=>"e49242ea-88f6-47ea-889e-b4ff16458160", "events_out"=>8848, "id"=>:b9df593cd1f475bfea38253daee0ec2d067d4472e0d83330232cfcc0d43e86c3, "queue_push_duration_in_millis"=>161928}, {"events_out"=>6845, "long_counters"=>[{"name"=>"matches", "value"=>6845}, {"name"=>"failures", "value"=>0}], "pipeline_ephemeral_id"=>"e49242ea-88f6-47ea-889e-b4ff16458160", "events_in"=>6845, "id"=>:"75c0d606f1d0f5ab5502612783ad2536e3b05bfb63f52a0eb39538bc8632b2ad", "duration_in_millis"=>6663}, {"events_out"=>5845, "pipeline_ephemeral_id"=>"e49242ea-88f6-47ea-889e-b4ff16458160", "events_in"=>6845, "id"=>:d912bc956f96847e33a3244650515ee9553660f397a00ff250b5e429521cf7d4, "duration_in_millis"=>809795}, {"events_out"=>5845, "pipeline_ephemeral_id"=>"e49242ea-88f6-47ea-889e-b4ff16458160", "events_in"=>5845, "id"=>:"56824aac4fa7991721d7c74ba2d7db945c118159806ab95c620a233f2ff97647", "duration_in_millis"=>7352}, {"events_out"=>5845, "pipeline_ephemeral_id"=>"e49242ea-88f6-47ea-889e-b4ff16458160", "events_in"=>5845, "id"=>:c604578ee83b84f65a5f15168e9cccc40e81512731a2a51114d1c44f9d3980da, "duration_in_millis"=>26726}], "hash"=>"7f67fb947beea1d4465543689e3137c4a9f73a0b44e68fa9f917f30002196ce3", "id"=>"main"}], "process"=>{"cpu"=>{"percent"=>21}, "open_file_descriptors"=>129, "max_file_descriptors"=>1048576}, "timestamp"=>2018-01-26T16:19:50.748Z, "os"=>{"cpu"=>{"load_average"=>{"1m"=>3.17, "15m"=>1.78, "5m"=>2.19}}}, "logstash"=>{"uuid"=>"b94361d0-d866-4209-8339-967ffa8721c0", "name"=>"wp-p3s-
[2018-01-26T16:19:50,848][TRACE][] Running: READ_HEADER
[2018-01-26T16:19:50,849][TRACE][] Frame version 2 detected
[2018-01-26T16:19:50,849][TRACE][] Transition, from: READ_HEADER, to: READ_FRAME_TYPE, requiring 1 bytes
[2018-01-26T16:19:50,849][TRACE][] Transition, from: READ_FRAME_TYPE, to: READ_WINDOW_SIZE, requiring 4 bytes
[2018-01-26T16:19:50,849][TRACE][] Running: READ_WINDOW_SIZE
[2018-01-26T16:19:50,849][TRACE][] Transition, from: READ_WINDOW_SIZE, to: READ_HEADER, requiring 1 bytes
[2018-01-26T16:19:50,849][TRACE][] Running: READ_HEADER
[2018-01-26T16:19:50,849][TRACE][] Frame version 2 detected
[2018-01-26T16:19:50,849][TRACE][] Transition, from: READ_HEADER, to: READ_FRAME_TYPE, requiring 1 bytes
[2018-01-26T16:19:50,849][TRACE][] Transition, from: READ_FRAME_TYPE, to: READ_COMPRESSED_FRAME_HEADER, requiring 4 bytes

Can you create a gist with at least a 1000 line before and after the event?