Logstash/Filebeat lag issue - Logs delayed by hours

Hello Elastic Community,

I am experiencing a significant lag in log ingestion where logs are arriving with a variable delay (sometimes hours late) despite having a high-performance environment. I would appreciate your advice on tuning my pipeline for this specific hardware

Single Server: Elasticsearch + Logstash are running on the same physical machine
Sources: Multiple servers sending logs via Filebeat.

The Problem:
Logs are queuing up at the Filebeat level. When I check Kibana, I see "recent" arrival times (@timestamp is current), but the actual log content (message) shows timestamps from 2+ hours ago. It seems like the pipeline "clogs" and then releases, causing variable delays

image

Logstash.yml:
pipeline.workers: 12 # Leaving ~20 cores for ES/OS
pipeline.batch.size: 2048

#JVM Heap is set to 8GB

Filebeat (Agents)
queue.mem:
events: 4096
flush.min_events: 512

output.logstash:
hosts: ["LOGSTASHIP:4000", "LOGSTASHIP:4001", ...] # 4 ports listening
worker: 4
bulk_max_size: 2048 # Matched with Logstash batch size
pipelining: 5
loadbalance: true

I want to clarify that this is not a timezone or system clock issue. The delay is inconsistent, i am seeing some logs arriving in real-time (live) while simultaneously receiving other logs with a 2-hour delay. If it were a timezone configuration error, the time offset would be constant for all logs, which is not the case here

I don't know where the specific bottleneck is right now. If you need any additional metrics, configurations, or specific logs to help diagnose this, please let me know and I'll be happy to share them

Hello and welcome,

Can you share the specs of the server? You mentioned that Logstash is using 8 GB of JVM HEAP, but didn't mention what is the total memory.

If you have just one Logstash server, why did you configure the output like this?

Also, share your entire Logstash pipeline as well and your full fileabeat.yml, it is not clear what is your Filebeat input.

Use the preformatted text to share those configurations, the </> button.

You have logs that are arriving in real time and others with a delay? What is the difference between them? The servers? The application? Is this constant or it changes?

Thanks for looking into this.

Server Specs & Memory:
Sorry I missed the total memory in the first post.

CPU: 32 Cores
RAM: 1 TB (Yes, 1 Terabyte physical RAM)
Elasticsearch Heap: 31GB (To stay under compressed oops threshold).
Logstash Heap: 8GB.
OS Cache: The rest (~900GB) is left for the OS to handle filesystem caching (Lucene).

Since I have a high volume of logs and a single Logstash instance with many cores, I configured multiple listeners to avoid a single TCP thread bottleneck at the input stage

Filebeat.yml(it is almost the same config in every beat):

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

queue.mem:
  events: 2048
  flush.min_events: 512

filebeat.inputs:
- paths:
    - /apache-tomcat-ws*/logs/catalina.out
    - /var/log/tomcats/*/catalina.out
  exclude_lines: ['.*MemcachedConnection.*','.*INFO: Added.*','.*Shut down memcached client.*','.*Could not redistribute to another node.*','.*Conexiones activas.*','.*ZV - EvaluarCondicion.*','.*evaluacion de resultado.*','.*Hay nodos.*','.*Cache hit.*']
  document_type: _doc
  fields:
    log_type: "w"
  fields_under_root: true
  scan_frequency: 10s
  close_inactive: 5m
  clean_inactive: 15m
  ignore_older: 10m
  close_removed: true
  tail_files: true
  close_timeout: 15m
  close_renamed: true
  force_close_files: false
  harvester_buffer_size: 16384
  max_bytes: 10485760
  tags: ["wap"]
- paths:
    - /var/log/smpp*.log
  exclude_lines: ['.*MemcachedConnection.*','.*INFO: Added.*','.*Shut down memcached client.*','.*Could not redistribute to another node.*','.*Conexiones activas.*','.*ZV - EvaluarCondicion.*','.*evaluacion de resultado.*','.*Hay nodos.*','.*Cache hit.*']
  document_type: _doc
  fields:
    log_type: "colas-smpp"
  fields_under_root: true
  close_inactive: 1m
  clean_inactive: 2h
  ignore_older: 1h
  close_removed: true
  tail_files: true
  close_timeout: 2h
  close_renamed: true
  force_close_files: false
- paths:
    - /var/log/receptor*.log
  exclude_lines: ['.*MemcachedConnection.*','.*INFO: Added.*','.*Shut down memcached client.*','.*Could not redistribute to another node.*','.*Conexiones activas.*','.*ZV - EvaluarCondicion.*','.*evaluacion de resultado.*','.*Hay nodos.*','.*Cache hit.*']
  document_type: _doc
  fields:
    log_type: "receptores-sms"
  fields_under_root: true
  close_inactive: 1m
  clean_inactive: 5m
  ignore_older: 2m
  close_removed: true
  tail_files: true
  close_timeout: 2h
  close_renamed: true
  force_close_files: true
output.logstash:
  hosts: ["LOGSTASHIP:4000","LOGSTASHIP:4001","LOGSTASHIP:4002","LOGSTASHIP:4003"]
  loadbalance: true
  worker: 4
  pipelining: 5
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
        path: /var/log/filebeat
        name: filebeat.log
        keepfiles: 7
        permissions: 0600

logstash.yml(Aside from the path locations, this is the only config I set):

# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
pipeline.workers: 12
#
# How many events to retrieve from inputs before sending to filters+workers
#
pipeline.batch.size: 2048
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: Enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" disables any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto
#
# Sets the pipeline's default value for `ecs_compatibility`, a setting that is
# available to plugins that implement an ECS Compatibility mode for use with
# the Elastic Common Schema.
# Possible values are:
# - disabled
# - v1
# - v8 (default)
# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a
# migrated pipeline continues to operate as it did before your upgrade, opt-OUT
# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting
# it here will set the default for _all_ pipelines, including new ones.
#
# pipeline.ecs_compatibility: v8

Yes, the delay varies between 1 to 2 hours sometimes 12, it is not related to a specific application type or server hardware the delay affects logs of the same type indiscriminately. It is not the case that "Log Type A" arrives fast and "Log Type B" arrives slow.

If I filter in Kibana for a single log type (e.g., just catalina.out logs), I see a mix of timestamps: some records arrive in real-time, while others from the exact same source/type arrive with a huge delay.

You forgot to share the logstash configuration, the one where you have your input , filter and output.

You need to share this file as well.

Another thing, what are the version of the tools? Filebeat, Logstash and Elasticsearch, you didn't mention the version.

Sorry i forgot, here it is

input.conf:

input {
  stdin {
    codec => multiline {
    pattern => "^\s"
    what => "previous"
  }
        }
  beats {
    port => "4000"
  }
  beats {
    port => "4001"
  }
  beats {
    port => "4002"
  }
  beats {
    port => "4003"
  }
}

output.conf:

output {
  stdout {
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [clone_type] == "wap" {
    elasticsearch {
      hosts => [ "ELASTIC1:9200", "ELASTIC2:9200", "ELASTIC3:9200" ]
      manage_template => false
      index => "wap-%{+YYYY.MM.dd}"
    }
  }
  if [log_type] == "colas-smpp" {
    elasticsearch {
      hosts => [ "ELASTIC1:9200", "ELASTIC2:9200", "ELASTIC3:9200" ]
      manage_template => false
      index => "colas-smpp-%{+YYYY.MM.dd}"
    }
  }
  if [log_type] == "receptores-sms" {
    elasticsearch {
      hosts => [ "ELASTIC1:9200", "ELASTIC2:9200", "ELASTIC3:9200" ]
      manage_template => false
      index => "receptores-sms-%{+YYYY.MM.dd}"
    }
  }
}

filebeat: 7.17.20
logstash: 8.18.1
elasticsearch: 8.19.7

How do you know that?

I am seeing logs arriving now (@timestamp) that correspond to events from 2+ hours ago, this confirms that Filebeat is currently reading lines from the log files that were written hours ago, meaning it is working through a backlog accumulated during the previous bottleneck.

You do not have any filters in Logstash, just the inputs and outputs?

Being honest it is pretty complicated to troubleshoot this with seeing some sample data on the source, and how they look in Kibana.

Also, I'm not sure that the @timestamp you have is coming from Lgostash, I think it is coming from Filebeat as Filebeat also generates a @timestamp field that would ovewrite the Logstash field since you do not have any filters.

Can you add this filter just before your input.

filter {
  ruby {
        code => "
            ls_timestamp = DateTime.now().strftime('%Y-%m-%dT%H:%M:%S.%L%Z')
            event.set('ls_timestamp',ls_timestamp)
        "
    }

}

This will create a new field named ls_timestamp with the time when logstash processed the message.

With this field it will be possible to now exactly when logstash processed the events.

Also, how are you running Logstash, as a service, right? You should also remove the stdin input and the stdout output, they just add noise and can impact in performance, specially the stdout output as everything will be written into the log files for the system.

1 Like

i didn’t send my filter sorry, here it is:

filter {
  if [log_type] == "wap" {
    clone {
      clones => ['wap']
      add_field => { "clone_wap" => "true" }
      add_field => { "clone_type" => "wap" }
      remove_field => [ "log_type" ]
    }
    grok {
      break_on_match => true
      match => {
        "message" => "%{NUMBER}: %{NUMBER}, %{JAVACLASS}, AccionRealizada: %{GREEDYDATA:accion} resultado %{NUMBER:resultado} Parametros: %{SYSLOG5424SD}%{SYSLOG5424SD}%{SYSLOG5424SD} Celular: %{GREEDYDATA} SaldoPrincipal %{GREEDYDATA} Tiempo:%{NUMBER:tiempo}"
      }
      match => {
        "message" => "%{NUMBER}: Celular: %{NUMBER:celular},Transaccion: %{GREEDYDATA:metodo},resultado:%{NUMBER:resultado},tiempo: %{NUMBER:tiempo},solicitante:%{IP:solicitante},interfaceid:%{NUMBER:interfaz}"
      }
      match => {
        "message" => "%{BASE16NUM:fecha_hora}: Coloco en cola: %{WORD:cola} valor: %{GREEDYDATA:mensaje_rabbit}"
      }
      match => {
        "message" => "%{BASE16NUM:fecha_hora}: Coloco en cola: %{WORD:cola} exchange: %{DATA:exchange} ruta: %{DATA:ruta} valor: %{GREEDYDATA:mensaje_rabbit}"
      }
      match => {
        "message" => "%{LOGLEVEL}: %{BASE16NUM:fecha_hora}:%{DATA:numero_celular},%{GREEDYDATA}Body\>\<%{WORD}\:%{GREEDYDATA:metodo}\><phonenumber>%{INT:phonenumber}%{GREEDYDATA}<packageid>%{WORD:tipo_paquete}-%{WORD:paquete}%{GREEDYDATA}<interfaceid>%{WORD:interfaz}%{GREEDYDATA}<transactionid>%{WORD:id_transaccion}%{GREEDYDATA}"
      }
      match => {
        "message" => "%{BASE16NUM:fecha_hora}: Tiempo de acreditacion %{BASE10NUM:acreditacion} num %{DATA:numero_celular}: %{BASE10NUM:tiempo} id trans: %{BASE10NUM:id_transaccion} id respuesta: %{BASE10NUM:resultado}"
      }
      match => {
        "message" => "%{GREEDYDATA}Intentando ejecutar la regla \(%{WORD:regla}\) %{GREEDYDATA}accion\(%{WORD:accion}\): comentario %{GREEDYDATA}\[%{GREEDYDATA}_%{BASE10NUM:id_transaccion}# %{GREEDYDATA:paquete}] con elemento %{GREEDYDATA}\[%{GREEDYDATA:offer_id}\]"
      }
      match => {
        "message" => "%{BASE10NUM:fecha_hora}: %{DATA:numero_celular}, %{JAVACLASS}, AccionRealizada: %{GREEDYDATA:accion} resultado %{BASE10NUM:resultado} Parametros: %{GREEDYDATA}OFFERID, %{BASE10NUM:offer_id}%{GREEDYDATA}?%{BASE10NUM:id_transaccion}# %{GREEDYDATA:paquete}\]\[%{GREEDYDATA} FECHAEXPIRA, %{TIMESTAMP_ISO8601:fecha_expiracion}%{GREEDYDATA} Celular: %{GREEDYDATA} SaldoPrincipal %{GREEDYDATA} Tiempo:%{BASE10NUM:tiempo}"
      }
      match => {
        "message" => "%{BASE10NUM:fecha_hora}:%{SPACE}Celular:%{SPACE}%{DATA:numero_celular},Transaccion: %{GREEDYDATA:metodo},resultado:%{SPACE}%{INT:resultado},%{SPACE}tiempo:%{SPACE}%{INT:tiempo}%{SPACE},%{SPACE}solicitante:%{SPACE}%{IPV4:solicitante}%{SPACE},%{SPACE}interfaceid:%{INT:interfaz}"
      }
      match => {
        "message" => "%{GREEDYDATA}%{BASE16NUM:fecha_hora}:%{DATA:numero_celular},%{GREEDYDATA}Body><%{WORD}:%{GREEDYDATA:metodo}<phonenumber>%{INT:phonenumber}<%{GREEDYDATA}<accreditationcode>%{INT:acreditacion}<%{GREEDYDATA}"
      }
      match => {
        "message" => "%{GREEDYDATA}: %{SPACE}%{GREEDYDATA:fecha_hora}:%{IP:solicitante}:(?<message>(.|\r|\n)*)"
        overwrite => [ "message" ]
      }
      match => {
        "message" => "%{GREEDYDATA}: %{SPACE}%{GREEDYDATA:fecha_hora}:%{IP:solicitante}:(?<text1>(.|\n)*)Body(?<text2>(.|\n)*)%{GREEDYDATA}>\n%{SPACE}<(?<text3>(.|\n)*):(?<text4>(.|\/:|\n)*(.|\n)*)>(?<text5>(.|\n)*)soapenv:Body(?<text4>(.|\n)*)"
        overwrite => [ "message" ]
      }
      match => {
        "message" => "(?<message>(.|\r|\n)*)"
        overwrite => [ "message" ]
      }
      match => {
        "message" => "%{GREEDYDATA}: %{SPACE}%{GREEDYDATA:fecha_hora}:%{DATA:numero_celular},(?<message>(.|\r|\n)*)"
        overwrite => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [tiempo]) {
      mutate {
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [accion]) {
      mutate {
        replace => { "log_type" => "rendimientocbs" }
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [producto]) {
      mutate {
        replace => { "log_type" => "rendimiento" }
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [cola]) {
      mutate {
        replace => { "log_type" => "rabbit_message" }
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [paquete] and ![mensaje_rabbit]) {
      mutate {
        replace => { "log_type" => "packages" }
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [acreditacion]) {
      mutate {
        replace => { "log_type" => "acreditacion" }
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [offer_id])  {
      mutate {
        replace => { "log_type" => "rendimiento_cbs" }
        remove_field => [ "message" ]
      }
    }
    if ([clone_type] != "wap" and [metodo]) {
      mutate {
        replace => { "log_type" => "rendimiento" }
        remove_field => [ "message" ]
      }
    }
    mutate {
      convert => {
        "accion"     =>     "string"
        "acreditacion"     =>     "integer"
        "celular"    =>     "integer"
        "cola"     =>     "string"
        "exchange"     =>     "string"
        "fecha_hora"     =>     "integer"
        "http_verb"    =>     "string"
        "id_transaccion"     =>     "integer"
        "interfaz"     =>     "integer"
        "mensaje_rabbit"     =>     "string"
        "metodo"     =>     "string"
        "numero_celular"     =>     "string"
        "offer_id"     =>     "string"
        "paquete"    =>     "string"
        "producto"     =>     "string"
        "regla"    =>     "string"
        "resultado"    =>     "integer"
        "ruta"     =>     "string"
        "solicitante"    =>     "string"
        "tiempo"     =>     "integer"
        "tipo_paquete"     =>     "string"
      }
    }
  }
}

Can 2h difference be related to the different time zone? Not sure where servers are deployed.

Have you checked FB logs? Any trace there?
Have you try an investigation for 1-2 source on FB side?
Why is so many settings close_ in filebeat.yml?

In general, FB is much simpler than LS, it reads lines and not much a processing logic in the most cases. Shouldn't be lags.

Both servers are in the same zone, the problem is some logs from the same server are arriving late

I am seeing repeated connection reset by peer errors pointing to the Logstash ports (4001, 4002)

2026-02-18T18:37:37.458-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp IP:PORT->LOGSTASHIP:4001: write: connection reset by peer
2026-02-18T18:37:37.458-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-18T18:37:37.458-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-18T18:37:37.463-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp IP:PORT->LOGSTASHIP:4002: write: connection reset by peer
2026-02-18T18:37:37.463-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-18T18:37:37.463-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-18T18:37:38.949-0600    ERROR   [publisher_pipeline_output]     pipeline/output.go:180  failed to publish events: write tcp IP:PORT->LOGSTASHIP:4001: write: connection reset by peer

Cause what i am monitoring rotates logs very frecuently under pressure and generates a massive volume

There was a similar problem, check this topic.
Most likely you have network issues, not just the lag like FB processing speed.

As Leandro Jedi said, remove stdin and stdout, especially debug because it consumes a lot of resources.

Since you are using different version, make sure to us the same version or at least upgrade FB. This is not mandatory, of course.

Since I have a high volume of logs and a single Logstash instance with many cores, I configured multiple listeners to avoid a single TCP thread bottleneck at the input stage

Why don't you use LS pipelines? Separate .conf files. Maybe even you can use much faster the dissect parser filter in some cases.

i see i see, i’ll try the configs of the topic and i’ll report back what i found

thanks for the help!!

This is important information as it may suggest that your issue is on Filebeat side.

If your source logs are rotating faster than Filebeat can process them, they could pile up and lead to delay and data loss.

Looking at what you shared I didn't see anything in Logstash that could lead to a delay and you have plenty of resources on the Logstash server.

Can you add the ruby filter shared previously to your pipeline so a timestamp of when Logstash processed the event is added to your documents and then share some sample events from Kibana showing all three time information?

That other topic with connection reset by peer messages seems different. Effectively it was a quiet TCP connection , with no real traffic, so got closed after a idle timeout, which can be prevented with different configuration options.

Check if the delayed logs are from same host(s) as appear in those reset log entries. Might be a red herring.

The crucial part is the various timestamps, what’s the time of the actual log entry, when did filebeat process the log, when did logstash process the log , when did elasticsearch process the log.

1 Like

Thanks Kevin for the tip.

I'm sure Cesar will check any reasonable possibility for those cases.

@leandrojmp I just added the logstash timestamp these are the results:

image

I reviewed my architecture and I cannot remove the clone filter, as the duplicated raw events are required by downstream systems and in the filebeat log i’m seeing this:

2026-02-19T11:06:16.538-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp IP:PORT->LOGSTASHIP:4003: write: connection reset by peer
2026-02-19T11:06:16.538-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-19T11:06:16.538-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-19T11:06:17.492-0600    ERROR   [publisher_pipeline_output]     pipeline/output.go:180  failed to publish events: write tcp IP:PORT->LOGSTASHIP:4004: write: connection reset by peer
2026-02-19T11:06:17.787-0600    ERROR   [publisher_pipeline_output]    

I commented out the stdin and implemented a filter to only process non-clones

@leandrojmp @RainTown What should I check next? After adding the ls_timestamp, there’s no proof Logstash or Elasticsearch are causing the issue; the only remaining factor is Filebeat. This is a Kibana screenshot: there are two logs harvested at the same time but created by my app at different time instances. This is my Filebeat config:

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 2s

filebeat.inputs:
# ================== WAP / TOMCAT ==================
- type: filestream
  id: tomcat-wap-logs
  enabled: true
  paths:
    - /apache-tomcat-ws*/logs/catalina.out
    - /var/log/tomcats/*/catalina.out
  exclude_lines: ['.*MemcachedConnection.*','.*INFO: Added.*','.*Shut down memcached client.*','.*Could not redistribute to another node.*','.*Conexiones activas.*','.*ZV - EvaluarCondicion.*','.*evaluacion de resultado.*','.*Hay nodos.*','.*Cache hit.*']
  fields:
    log_type: "wap"
  fields_under_root: true
  tags: ["wap"]

  message_max_bytes: 10485760
  close.on_state_change.renamed: true
  close.on_state_change.removed: true
  close.on_state_change.inactive: 5m
  clean_inactive: 15m
  ignore_older: 72h

# ================== COLAS SMPP ==================
- type: filestream
  id: colas-smpp-logs
  enabled: true
  paths:
    - /var/log/smpp*.log
  exclude_lines: ['.*MemcachedConnection.*','.*INFO: Added.*','.*Shut down memcached client.*','.*Could not redistribute to another node.*','.*Conexiones activas.*','.*ZV - EvaluarCondicion.*','.*evaluacion de resultado.*','.*Hay nodos.*','.*Cache hit.*']
  fields:
    log_type: "colas-smpp"
  fields_under_root: true

  close.on_state_change.renamed: true
  close.on_state_change.removed: true
  close.on_state_change.inactive: 1m
  clean_inactive: 2h
  ignore_older: 72h

# ================ RECEPTORES SMS ================
- type: filestream
  id: receptores-sms-logs
  enabled: true
  paths:
    - /var/log/receptor*.log
  exclude_lines: ['.*MemcachedConnection.*','.*INFO: Added.*','.*Shut down memcached client.*','.*Could not redistribute to another node.*','.*Conexiones activas.*','.*ZV - EvaluarCondicion.*','.*evaluacion de resultado.*','.*Hay nodos.*','.*Cache hit.*']
  fields:
    log_type: "receptores-sms"
  fields_under_root: true

  close.on_state_change.renamed: true
  close.on_state_change.removed: true
  close.on_state_change.inactive: 1m
  clean_inactive: 2h
  ignore_older: 72h

# ==================== SALIDA ====================
output.logstash:
  hosts: ["LOGSTASH:4000","LOGSTASH:4001","LOGSTASH:4002","LOGSTASH:4003","LOGSTASH:4004"]
  bulk_max_size: 2048
  worker: 5
  loadbalance: true
  compression_level: 3
  ttl: 5m
  pipelining: 5
  timeout: 300

# ==================== LOGGING ===================
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
  path: /var/log/filebeat
  name: filebeat.log
  keepfiles: 7
  permissions: 0600

These are the only errors I found, but as I understand it, Logstash closes the connection to save resources and reopens it when needed:

2026-02-24T10:54:34.606-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp 172.22.58.98:37404->172.24.20.18:4004: write: connection reset by peer
2026-02-24T10:54:34.606-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-24T10:54:34.606-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-24T10:54:34.614-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp 172.22.58.98:39440->172.24.20.18:4003: write: connection reset by peer
2026-02-24T10:54:34.614-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-24T10:54:34.614-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-24T10:54:34.621-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp IP:PORT->LOGSTASHIP:4001: write: connection reset by peer
2026-02-24T10:54:34.621-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-24T10:54:34.621-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-24T10:54:34.627-0600    ERROR   [logstash]      logstash/async.go:280   Failed to publish events caused by: write tcp IP:PORT->LOGSTASHIP:PORT: write: connection reset by peer
2026-02-24T10:54:34.627-0600    INFO    [publisher]     pipeline/retry.go:219   retryer: send unwait signal to consumer
2026-02-24T10:54:34.627-0600    INFO    [publisher]     pipeline/retry.go:223     done
2026-02-24T10:54:35.882-0600    ERROR   [publisher_pipeline_output]     pipeline/output.go:180  failed to publish events: write tcp IP:PORT->LOGSTASHIP:PORT: write: connection reset by peer
2026-02-24T10:54:36.075-0600    ERROR   [publisher_pipeline_output]     pipeline/output.go:180  failed to publish events: write tcp IP:PORT->LOGSTASHIP:PORT: write: connection reset by peer
2026-02-24T10:54:36.388-0600    ERROR   [publisher_pipeline_output]     pipeline/output.go:180  failed to publish events: write tcp IP:PORT->LOGSTASHIP:PORT: write: connection reset by peer
2026-02-24T10:54:36.437-0600    ERROR   [publisher_pipeline_output]     pipeline/output.go:180  failed to publish events: write tcp IP:PORT->LOGSTASHIP:PORT: write: connection reset by peer

Thanks for looking into this

Maybe this will help...
On Logstash side:

  • Increase client_inactivity_timeout , default value is 60sec
    On Filebeat side:
  • increase ttl , disable pipelining:0

i’ll try it!