Filebeat clients unable to publish events to logstash

Hi all,

I had a filebeat to logstash setup running successfully for the past months where both services where running on the same host. I recently moved filebeat to it's own server in a different zone. I am able to hit port 5044 on my logstash server and filebeat test output also shows all as ok. Running ss -tnp also shows established connections from the filebeat hosts. However when filebeat is trying to send new log messages to logstash I'm getting the below errors. I've tried applying the ttl: 60 and pipelining: 0 setting but that so far has not fixed the issue. Anyone have any idea what else I can take a look at here?

Versions

filebeat version 9.0.0
logstash 8.11.1

Filebeat Error 1

"Failed to publish events caused by: read tcp 10.1.100.10:42948->10.110.10.10:5044: i/o timeout"
"Failed to publish events caused by: client is not connected"

Filebeat Error 2 (when running without ttl and pipelines setting)

failed to publish events: write tcp 10.1.100.10:55088->10.110.10.10:5044: write: connection reset by peer

Logstash Error

[2025-05-27T06:26:04,840][INFO ][org.logstash.beats.BeatsHandler][main][19e7f9959b025af35386fcbea7871c038e6e6d82f795909d15016386b9e6a6d0] [local: 10.110.10.10:5044, remote: 10.1.100.10:42948] Handling exception: java.net.SocketException: Connection reset (caused by: java.net.SocketException: Connection reset)
[2025-05-27T06:26:04,841][WARN ][io.netty.channel.DefaultChannelPipeline][main][19e7f9959b025af35386fcbea7871c038e6e6d82f795909d15016386b9e6a6d0] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.net.SocketException: Connection reset
        at sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394) ~[?:?]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426) ~[?:?]
        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:254) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at java.lang.Thread.run(Thread.java:840) [?:?]

filebeat.yml

filebeat.inputs:
- type: filestream
  id: access-logs
  enabled: true
  paths:
    - /mnt/*/log/access.log

  # Enable file identity fingerprint
  prospector.scanner.fingerprint.enabled: true
  prospector.scanner.fingerprint.offset: 0
  prospector.scanner.fingerprint.length: 64
  file_identity.fingerprint: ~
  clean_removed: false

output.logstash:
  # The Logstash hosts
  hosts: ["10.110.10.10:5044"]
  ttl: 60         # Tested with and without
  pipelining: 0   # Tested with and without

processors:
  - add_host_metadata: ~

logstash.yml

input {
  beats {
    port => 5044
  }
}

...

Testing

[root@localhost]# filebeat test config
Config OK
[root@localhost]# filebeat test output
logstash: 10.110.10.10:5044...
  connection...
    parse host... OK
    dns lookup... OK
    addresses: 10.110.10.10
    dial up... OK
  TLS... WARN secure connection disabled
  talk to server... OK

Have you tried without ttl: 60 and pipelining?

The "ttl" option is not yet supported on an async Logstash client (one with the "pipelining" option set).

Yes I have. When leaving out those settings I do see messages send to logstash but am also seeing the below errors on the filebeat node.

{"message":"Failed to publish events caused by: write tcp 10.1.100.10:43834->10.110.10.10:5044: write: connection reset by peer","service.name":"filebeat","ecs.version":"1.6.0"}
{"message":"failed to publish events: write tcp 10.1.100.10:43834->10.110.10.10:5044: write: connection reset by peer","service.name":"filebeat","ecs.version":"1.6.0"}
{"message":"Connecting to backoff(async(tcp://10.110.10.10:5044))","service.name":"filebeat","ecs.version":"1.6.0"}
{"message":"Connection to backoff(async(tcp://10.110.10.10:5044)) established","service.name":"filebeat","ecs.version":"1.6.0"}

Do you have anything between filebeat and logstash?

Every log you shared until now suggests network issues.

There is nothing reported because of the version difference. Can be because versions, what do you thing Leandro?
filebeat version 9.0.0, logstash 8.11.1

I would expect a different error.

@MajorNickle can you upgrade Logstash to 8.18.1 to rule out a version compatibility issue?

I upgraded my logstash instance to 9.0.1 yesterday but still seeing those errors. I set my logstash logging level to debug and do not see any errors or hints regarding a connection reset either. I don't have any firewall inbetween those two VMs as they're on the same subnet. I do however have a local firewall enabled on the logstash server and port 5044/tcp open. But it doesn't make a difference when stopping firewalld. Getting the same reset connection errors.

Both systems run on RockyLinux 9.5 with a minimal install.

Good, just to make sure

As Leandro said, might be some network issues since the main error is connection reset by peer, check network devices.
Can you set debug mode on both sides FB and LS? Any trace in that case?

Yep, let me set log.level to debug on the fb side as well. I'll report back what I find.

Maybe will be some trace in logs on both sides. If still be there the same unclear message,most likely will be, check firewall and network paths.

By the way, have you tried telnet/curl to 10.110.10.10:5044?

Yep, I can telnet 5044 no problem. I can see using ss -tnp | grep ":5044" the established connection as well. Below is the trace from a new message being detected by filebeat. I see the message being received by logstash as well.

DEBUG        [file_watcher]        map[file.line:229 file.name:filestream/fswatch.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*fileWatcher).watch]        File scan complete        {"service.name": "filebeat", "total": 1, "written": 0, "truncated": 0, "renamed": 0, "removed": 0, "created": 0, "ecs.version": "1.6.0"}
DEBUG        [input.filestream]        map[file.line:131 file.name:filestream/filestream.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*logFile).Read]        End of file reached: /mnt/my-nas/log/access.log; Backoff now.        {"service.name": "filebeat", "id": "access-logs", "source_file": "filestream::access-logs::fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "path": "/mnt/my-nas/log/access.log", "state-id": "fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "ecs.version": "1.6.0"}
DEBUG        [transport]        map[file.line:211 file.name:transport/client.go function:github.com/elastic/elastic-agent-libs/transport.(*Client).handleError]        handle error: write tcp 10.1.100.10:44082->10.110.10.10:5044: write: connection reset by peer        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [transport]        map[file.line:124 file.name:transport/client.go function:github.com/elastic/elastic-agent-libs/transport.(*Client).Close]        closing        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [logstash]        map[file.line:174 file.name:logstash/async.go function:github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).Publish]        1 events out of 1 events sent to logstash host 10.110.10.10:5044. Continue sending        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [logstash]        map[file.line:129 file.name:logstash/async.go function:github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).Close]        close connection        {"service.name": "filebeat", "ecs.version": "1.6.0"}
ERROR        [logstash]        map[file.line:285 file.name:logstash/async.go function:github.com/elastic/beats/v7/libbeat/outputs/logstash.(*msgRef).dec]        Failed to publish events caused by: write tcp 10.1.100.10:44082->10.110.10.10:5044: write: connection reset by peer        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [logstash]        map[file.line:129 file.name:logstash/async.go function:github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).Close]        close connection        {"service.name": "filebeat", "ecs.version": "1.6.0"}
ERROR        [publisher_pipeline_output]        map[file.line:174 file.name:pipeline/client_worker.go function:github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*netClientWorker).publishBatch]        failed to publish events: write tcp 10.1.100.10:44082->10.110.10.10:5044: write: connection reset by peer        {"service.name": "filebeat", "ecs.version": "1.6.0"}
INFO        [publisher_pipeline_output]        map[file.line:138 file.name:pipeline/client_worker.go function:github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*netClientWorker).run]        Connecting to backoff(async(tcp://10.110.10.10:5044))        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [logstash]        map[file.line:121 file.name:logstash/async.go function:github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).Connect]        connect        {"service.name": "filebeat", "ecs.version": "1.6.0"}
INFO        [publisher_pipeline_output]        map[file.line:146 file.name:pipeline/client_worker.go function:github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*netClientWorker).run]        Connection to backoff(async(tcp://10.110.10.10:5044)) established        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [logstash]        map[file.line:174 file.name:logstash/async.go function:github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).Publish]        1 events out of 1 events sent to logstash host 10.110.10.10:5044. Continue sending        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [publisher]        map[file.line:80 file.name:memqueue/ackloop.go function:github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig]        ackloop: return ack to broker loop:1        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [publisher]        map[file.line:82 file.name:memqueue/ackloop.go function:github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig]        ackloop:  done send ack        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [file_watcher]        map[file.line:125 file.name:filestream/fswatch.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*fileWatcher).watch]        Start next scan        {"service.name": "filebeat", "ecs.version": "1.6.0"}
DEBUG        [file_watcher]        map[file.line:229 file.name:filestream/fswatch.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*fileWatcher).watch]        File scan complete        {"service.name": "filebeat", "total": 1, "written": 1, "truncated": 0, "renamed": 0, "removed": 0, "created": 0, "ecs.version": "1.6.0"}
DEBUG        [input.filestream]        map[file.line:262 file.name:filestream/prospector.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*fileProspector).onFSEvent]        File /mnt/my-nas/log/access.log has been updated        {"service.name": "filebeat", "id": "access-logs", "prospector": "file_prospector", "operation": "write", "source_name": "fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "fingerprint": "a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "os_id": "111676592-43", "new_path": "/mnt/my-nas/log/access.log", "old_path": "/mnt/my-nas/log/access.log", "ecs.version": "1.6.0"}
DEBUG        [input.filestream]        map[file.line:140 file.name:input-logfile/harvester.go function:github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(*defaultHarvesterGroup).Start]        Starting harvester for file        {"service.name": "filebeat", "id": "access-logs", "source_file": "filestream::access-logs::fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "ecs.version": "1.6.0"}
DEBUG        [input.filestream]        map[file.line:206 file.name:input-logfile/harvester.go function:github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.startHarvester.func1]        Stopped harvester for file        {"service.name": "filebeat", "id": "access-logs", "source_file": "filestream::access-logs::fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "ecs.version": "1.6.0"}
DEBUG        [input.filestream]        map[file.line:131 file.name:filestream/filestream.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*logFile).Read]        End of file reached: /mnt/my-nas/log/access.log; Backoff now.        {"service.name": "filebeat", "id": "access-logs", "source_file": "filestream::access-logs::fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "path": "/mnt/my-nas/log/access.log", "state-id": "fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "ecs.version": "1.6.0"}
DEBUG        [input.filestream]        map[file.line:131 file.name:filestream/filestream.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*logFile).Read]        End of file reached: /mnt/my-nas/log/access.log; Backoff now.        {"service.name": "filebeat", "id": "access-logs", "source_file": "filestream::access-logs::fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "path": "/mnt/my-nas/log/access.log", "state-id": "fingerprint::a6b2891bb9e998fcff459bf1a2abe73c41a261e92d971965ea83c87d74b002d5", "ecs.version": "1.6.0"}
INFO        [monitoring]        map[file.line:192 file.name:log/log.go function:github.com/elastic/beats/v7/libbeat/monitoring/report/log.(*reporter).logSnapshot]        Non-zero metrics in the last 30s        {"service.name": "filebeat", "monitoring": {"metrics": {"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":65056768}}}},"cpu":{"system":{"ticks":60,"time":{"ms":10}},"total":{"ticks":170,"time":{"ms":20},"value":170},"user":{"ticks":110,"time":{"ms":10}}},"handles":{"limit":{"hard":524288,"soft":524287},"open":12},"info":{"ephemeral_id":"1f7268f7-be10-4ff7-a48e-f2c6c351e81d","uptime":{"ms":150035},"version":"9.0.0"},"memstats":{"gc_next":44974762,"memory_alloc":24073472,"memory_total":43249568,"rss":150601728},"runtime":{"goroutines":32}},"filebeat":{"harvester":{"open_files":1,"running":1}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":1,"active":0,"batches":2,"failed":1,"total":2},"read":{"bytes":6},"write":{"bytes":641,"errors":1,"latency":{"histogram":{"count":3,"max":22,"mean":10,"median":8,"min":0,"p75":22,"p95":22,"p99":22,"p999":22,"stddev":9.092121131323903}}}},"pipeline":{"clients":1,"events":{"active":0,"published":1,"retry":2,"total":1},"queue":{"acked":1,"added":{"events":1},"consumed":{"events":1},"filled":{"bytes":0,"events":0,"pct":0},"max_bytes":0,"max_events":3200,"removed":{"events":1}}}},"registrar":{"states":{"current":0}},"system":{"load":{"1":0,"15":0,"5":0,"norm":{"1":0,"15":0,"5":0}}}}, "ecs.version": "1.6.0"}}
DEBUG        [file_watcher]        map[file.line:125 file.name:filestream/fswatch.go function:github.com/elastic/beats/v7/filebeat/input/filestream.(*fileWatcher).watch]        Start next scan        {"service.name": "filebeat", "ecs.version": "1.6.0"}

So I took some time and stood up a brand new filebeat and logstash setup on my local machine using two RockyLinux 9.5 minimal installs on a private network. I use the same filebeat config and a very minimal logstash pipeline outputing to stdout but am still seeing the same errors. I also tried the logstash docker version but again seeing the same errors. Can someone else replicate this? Also tried a lower filebeat version 8.11.1 .. I'm a little out of ideas at this point.

/etc/logstash/conf.d/to_confluent.conf

input {
  beats {
    port => 5044
  }
}

output {
    stdout { codec => rubydebug }
}

error

{"log.level":"error","@timestamp":"2025-05-28T15:07:37.879-0700","log.logger":"logstash","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/logstash.(*msgRef).dec","file.name":"logstash/async.go","file.line":285},"message":"Failed to publish events caused by: write tcp 192.168.220.129:54166->192.168.220.130:5044: write: connection reset by peer","service.name":"filebeat","ecs.version":"1.6.0"}

Is there any errors on LS logs(in debug mode) except this one?

{"log.level":"error","@timestamp":"2025-05-28T15:07:37.879-0700","log.logger":"logstash","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/logstash.(*msgRef).dec","file.name":"logstash/async.go","file.line":285},"message":"Failed to publish events caused by: write tcp 192.168.220.129:54166->192.168.220.130:5044: write: connection reset by peer","service.name":"filebeat","ecs.version":"1.6.0"}

No errors on this local to local setup. However I just did a little more testing with other clients I have access to. 1x filebeat server in London and 1x logstash server in New York. I do see the same error connection reset by peer on the filebeat client and see some messages going through but after 2-3 or so I do see the java.net.SocketException: Connection reset error from above again on the logstash end.

Hi @MajorNickle

Please show the exact telnet command and output when telnet ftom The filebeat host to logstash host and 5044.

Also, you can run the file beat

filebeat test output

The reason I ask is this forum is full of posts with users that sure there is no firewall or network configuration that would block these connections ... Only to learn later there is... Even some of our most experienced users.

For example
I had a very frustrating example myself in this when it turned out that the terminal that I was running inside vs code had start applying local firewall rules as one of their new security features....

And so my config would run as a service but when I ran from inside vs code it would not....Up till 3:00 a.m. figuring that one out!

I am not saying that's what's happening with you but

Is almost always firewall / network related either in the network or at the host level.

@stephenb sure thing. Below are the outputs. I've also outlined the steps I did yesterday while testing on my local workstation vs my prod network. When testing sending messages I just cat a test_messages file and echo >> it to the log file at /mnt/my-nas/log/access.log. The test_messages file about a 100 lines looking similar to the below sample message`. Hope these details help.

telnet

[root@localhost filebeat]# telnet 10.110.10.10 5044
Trying 10.110.10.10...
Connected to 10.110.10.10.

filebeat test config

[root@localhost filebeat]# filebeat test config
Config OK
[root@localhost filebeat]# filebeat test output
logstash: 10.110.10.10:5044...
  connection...
    parse host... OK
    dns lookup... OK
    addresses: 10.110.10.10
    dial up... OK
  TLS... WARN secure connection disabled
  talk to server... OK

Steps to reproduce

  1. Setup 2x new VMs with a RockyLinux9.5 minimal install
  2. Updated packages dnf update -y
  3. Installed filebeat 9.0.1 on host using install guide and rpm
  4. Installed logstash 9.0.1 on the other host using the elastic repo
  5. Added the below filebeat config.
  6. Added the below logstash config.
  7. Allowed incoming 5044/tcp on the logstash server firewall-cmd --add-port=5044/tcp --permanent and firewall-cmd --reload
  8. Verified I can telnet from filebeat to logstash using port 5044.
  9. Start both services.
  10. For testing I'm echoing some of the expected log messages (below) to e.g /mnt/my-nas/log/access.log
  11. journalctl -u filebeat -f will show the connection reset by peer messages.

filebeat.yml

filebeat.inputs:
- type: filestream
  id: access-logs
  enabled: true
  paths:
    - /mnt/*/log/access.log
  prospector.scanner.fingerprint:
    enabled: true
    offset: 0
    length: 64
  file_identity.fingerprint: ~
  clean_removed: false
  close.on_state_change.inactive: 48h

output.logstash:
  hosts: ["10.110.10.10:5044"]
  bulk_max_size: 128
  timeout: 60

processors:
  - add_host_metadata: ~

to_confluent.conf

input {
  beats {
    port => 5044
  }
}

output {
    stdout { codec => rubydebug }
}

expected log messages (spaces do matter here unfortunately)

 23890 - [Thu May  8 10:36:36 2025] [Info   ] MXF ingested [4322d760-7156-4f1f-a8e2-acbf93167e68]

For debugging purposes when it gets down to the hard / unusual stuff here is what I would recommend.

What I would do is pull down the tar.gz of each un tar, configure and run each in the foreground that further isolates the issues... that should further isolate

run filebeate

./filebeat -e

or

./filebeat -e -d "*"

Run logstash with the -f options to specify the logstash config file..

./bin/logstash -r -f ./config/myconfig.conf

1 Like

Thanks for sharing the detailed steps, it's so unusual that posters do so. I am almost tempted to just follow your steps to see if I can reproduce, might do so if this still open tomorrow.

You might wish to capture the traffic inside the VMs from the relevant interfaces. I'm with the others that still think this is a local "network issue" of some sort, might be wrong of course.

Idea: Try just disabling the firewalls completely in your VMs / RockyLinux9.5 test system and see if the problem persists.

One thing I noticed:

If you really did the middle step there before you even started logstash, then it wasn't logstash that you were "connected" to?

@stephenb Alright, this is interesting. I just tested both tar.gz versions and the issue seems to be with the packaged logstash version. When using the bundled one I don't get the connection reset by peer messages at all. I confirmed that by also testing with both the packaged and bundled filebeat versions. Do you want me to test anything else?

@RainTown I was thinking to run some tcpdumps as well. I already have turned off both firewalls as well to eliminate them.

Sorry it seems I wasn't patient enough. I still see the errors but they seem to only pop up after about 60s after the last message was successfully send. 60s seems to be the default value for the client_inactivity_timeout option on the logstash filebeat input. Could this be related?