Logstash - Slow data processing

I am using Filebeat and Logstash to set up a logging system for handling logs from around 35 servers. Currently, I have large volume of logs, and Logstash is not able to process the data quickly enough. As a result, there are approximately 132 GB of logs queued.

In my Logstash pipeline configuration, I am not using any filters or additional processing. I am simply taking input from Beats and writing the logs to files. For the queue, I am using the persisted type. However, I am struggling to achieve real-time log processing due to the slow handling of logs in the queue.

Here is my server configuration for Logstash:

  • CPU: 16 cores
  • Memory: 32 GB

My logstash.yml file configuration is as follows:
pipeline.workers: 16
pipeline.batch.size: 250
pipeline.batch.delay: 50
queue.type: persisted
queue.max_bytes: 8gb
queue.checkpoint.acks: 2048

Despite allocating sufficient resources, Logstash is unable to process logs at the expected speed. How can I optimize my setup to resolve this issue and achieve real-time log processing? Are there specific settings or architectural changes I should consider to handle this volume of logs efficiently?

Hi @Harsh08 Welcome to the Elastic community. Few questions -

  1. Is your hardware being fully utilized? Specially CPUs?
  2. Is it single instance of logstash where all the traffic is coming?
  3. You can also apply monitoring to see where is the bottleneck ? There might be issue on Elasticsearch side. Monitoring will help you to determine this.

Let us know.

Hello @ashishtiwari1993 ,
Thank you for your response.
Here are the answers to your questions,


Image of CPU utilization.
2) Yes - I am sending all servers logs to the single logstash instance.

I think you need to scale out logstsah. You should add more logstash nodes and apply load balancing on filebeat.

Okay, Thank You @ashishtiwari1993
Can you please explain me more how can i do that ?
Do you mean that I have to use Kafka for the load balancing system?

What is the type of disk of your Logstash node? Are you using HDD or SSD?

Persistent queues can impact in the performance as every single message needs to be read and written from the disk before being processed.

Also, what are your Elasticsearch specs, including the disk type? The elasticsearch performance will also impact in Logstash.

Do you have any information about the event rate? What would be the large volume of logs ?

I've had cases where I had hundreds of servers sending system logs to a single logstash instance without any issue, you may be able to improve your performance without having to scale-out.

From the screenshot you share I would assume that you are having I/O issues because of the persistent queue.

First of all thank you for your response @leandrojmp

What is the type of disk of your Logstash node? Are you using HDD or SSD? ---> SSD

Persistent queues can impact in the performance as every single message needs to be read and written from the disk before being processed. ----> so, What to do for the improvement? change the queue type persistent to memory ???

Also, what are your Elasticsearch specs, including the disk type? The elasticsearch performance will also impact in Logstash. ----> Currently, I am not using Elasticsearch just logstash for the log store purpose.
Disk Size of the logstash is 300G.

Do you have any information about the event rate? What would be the large volume of logs ? ----->

"events" : {
        "in" : 12758919,
        "out" : 12061301,
        "filtered" : 12061301,
        "duration_in_millis" : 287628205,
        "queue_push_duration_in_millis" : 5019177
      },

This is for one pipeline. I have configured multiple pipelines.

And also in pipeline.yml file i have configured multiple pipeline for the different servers.

So for example if you having 5 filebeat instance which constantly sending traffic to one logstash node, setup more logstash node may be add 2 more. Add all logstash endpoint details in filebeat output configuration and enable the loadbalancer. All filebeat will start sending traffic equally to all three logstash server. This is how you can scale horizontally.

The same you can achieve using Kafka but above can be implemented quickly.

Let us know.

So, what is your output then? It is a file output on the same host?

If so, you are receiving the data, writing it to disk, reading it from disk, processing it in your pipeline and writing it again on disk.

Depending on the event rate, event size and disk speed this will impact the performance, and this match the screenshot you shared where you have a high system load.

The persistent queue is normally used to act as a buffer of events when your output cannot keep up with the event rate from logstash, but in your case if your disk cannot write events fast enough, your persistent queue will start to fill-up, which will then use more IO from your system.

In this case I would use the default memory queue to avoid duplicate IO usage.

It depends on the use case, personally I do not use persistent queues as they do not scale and require your logstash nodes to have larger and faster disks, I prefer to treat the logstash nodes as stateless, so I use Kafka as a log buffer, my logs are sent to Kafka and then I can have multiple logstash consuming from the kafka topics.

To send logs to kafka depends on what tools you are using, beats can send it directly to kafka, but for other data sources like firewalls, you may need to have something between them and Kafka, this can be another logstash layer, filebeat or vector (from datadog), currently I'm moviing everything to vector because of limitations from both filebeat and logstash on my use cases.

1 Like

so, you are receiving the data, writing it to disk, reading it from disk, processing it in your pipeline and writing it again on disk.

From Application Servers filebeat will send the logs to the one logger server(Logstash Server) and logstash receive the logs and write in the respective log file.

input {
  beats {
    port => 5582
    add_field => { "source_ip" => "%{[host][ip]}" }
  }
}

output {
    file {
      path => "/logging/Logs/application-1.log"
      codec => line { format => "%{source_ip} - %{message}" }
    }
  stdout { codec => rubydebug }
}

This is my config file.
I have configured multiple config files for the different ports.Logstash will receive the logs from respective port and write in the file.

Yes, but it will also write it to the persistent queue on the same disks. It might also have to read it back from the queue before it can write it in the file.

That significantly increases the amount of IO, and it is not clear that it actually protects you. When would writing to the file fail under circumstances where writing to the PQ does not?

So, instead of persisted queue I should use default memory queue??
The reason I am using persisted is that when the large amount of logs comes to logstash I observed logloss issue. Therefore, as a solution of logloss issue I have used persisted.
Currently I have 115G of logs in the queue section but at night my all applications are in stop state. During night no log generation is there so, meantime in night hour logstash should process that queue data but it is not doing the same.

I have modified the queue type from persisted to memory. But, after the changes logloss issue observed.On the application server there are 14 lines whereas on the logstash server there are only 2 lines.
For other applications also on the application server 104 log events on the other hand, on logstash server there only 29 lines.

And how is the load on your server after you changed all your pipelines to use the memory queue?

Also, remove this from your output.

stdout { codec => rubydebug }

There is no reason to have this input unless while testing pipelines, you are unnecessarily writing the events to a log file with this.


As such, there are no changes in the CPU load after modifying the queue type.

Also, remove this from your output.

Sure i will remove that from all config files.

This is even worse than your first screenshot.

Do you run anything else on this server besides Logstash?

Your server is under a heavy load, I would say that probably related to IO, your disk may not be fast enough for the event rate you have.

Can you run iotop and iostat to get more insight on the IO usage?

Do you run anything else on this server besides Logstash?

No, I am just running logstash on this server.

Can upgrading the server resolve this issue?


This is when i use persist queue type.

I don't think so, there is no guarantee since the root cause was not found.

Troubleshooting Logstash performance can be pretty complicated as it depends in many things.

How many pipelines do you have? Can you share the entire logstash.yml and pipelines.yml ?

Do your pipelines configuration have any filters or just the beats input and file output that you shared previously?

Hello @leandrojmp,
Sorry for the late reply.
I am sharing both files below:

logstash.yml

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /var/lib/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2
#
pipeline.workers: 16
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125

pipeline.batch.size: 250
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: Enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' settin# is also set to '1', and disables otherwise.
# "true" enforces ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" disables any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto
#
# Sets the pipeline's default value for `ecs_compatibility`, a setting that is
# available to plugins that implement an ECS Compatibility mode for use with
# the Elastic Common Schema.
# Possible values are:
# - disabled
# - v1
# - v8 (default)
# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a
# migrated pipeline continues to operate as it did before your upgrade, opt-OUT
# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting
# it here will set the default for _all_ pipelines, including new ones.
#
# pipeline.ecs_compatibility: v8
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60)
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
#
# api.enabled: true
#
# By default, the HTTP API is not secured and is therefore bound to only the
# host's loopback interface, ensuring that it is not accessible to the rest of
# the network.
# When secured with SSL and Basic Auth, the API is bound to _all_ interfaces
# unless configured otherwise.
#
#api.http.host: 127.0.0.1
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
#api.http.port: 9600-9700
#
# The HTTP API includes a customizable "environment" value in its response,
# which can be configured here.
#
# api.environment: "production"
#
# The HTTP API can be secured with SSL (TLS). To do so, you will need to provide
# the path to a password-protected keystore in p12 or jks format, along with credentials.
#
# api.ssl.enabled: false
# api.ssl.keystore.path: /path/to/keystore.jks
# api.ssl.keystore.password: "y0uRp4$$w0rD"
#
# The availability of SSL/TLS protocols depends on the JVM version. Certain protocols are
# disabled by default and need to be enabled manually by changing `jdk.tls.disabledAlgorithms`
# in the $JDK_HOME/conf/security/java.security configuration file.
#
# api.ssl.supported_protocols: [TLSv1.2,TLSv1.3]
#
# The HTTP API can be configured to require authentication. Acceptable values are
#  - `none`:  no auth is required (default)
#  - `basic`: clients must authenticate with HTTP Basic auth, as configured
#             with `api.auth.basic.*` options below
# api.auth.type: none
#
# When configured with `api.auth.type` `basic`, you must provide the credentials
# that requests will be validated against. Usage of Environment or Keystore
# variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which
# resolves to the value stored in the keystore's `HTTP_PASS` variable if present
# or the same variable from the environment)
#
# api.auth.basic.username: "logstash-user"
# api.auth.basic.password: "s3cUreP4$$w0rD"
#
# When setting `api.auth.basic.password`, the password should meet
# the default password policy requirements.
# The default password policy requires non-empty minimum 8 char string that
# includes a digit, upper case letter and lower case letter.
# Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't
# meet the password policy requirements.
# The default is WARN. Setting to ERROR enforces stronger passwords (recommended).
#
# api.auth.basic.password_policy.mode: WARN
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
#queue.type: memory
#
queue.type: persisted
# If `queue.type: persisted`, the directory path where the pipeline data files will be stored.
# Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`.
# Default is path.data/queue.
#
path.queue: /central-logging/logstash-queue
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
queue.max_bytes: 8gb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
queue.checkpoint.acks: 2048
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
#
# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
# expired the events could be automatically deleted from the DLQ.
# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
# to represent a five days interval.
# The available units are respectively d, h, m, s for day, hours, minutes and seconds.
# If not specified then the DLQ doesn't use any age policy for cleaning events.
#
# dead_letter_queue.retain.age: 1d

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
# log.level: info
#
# Options for log.format:
#   * plain (default)
#   * json
#
# log.format: plain
# log.format.json.fix_duplicate_message_fields: false
#
path.logs: /var/log/logstash
#
# ------------ Other Settings --------------
#
# Allow or block running Logstash as superuser (default: true)
# allow_superuser: false
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# Determine where to allocate memory buffers, for plugins that leverage them.
# Default to direct, optionally can be switched to heap to select Java heap space.
# pipeline.buffer.type: direct
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.certificate: /path/to/file
#xpack.monitoring.elasticsearch.ssl.key: /path/to/key
#xpack.monitoring.elasticsearch.ssl.verification_mode: full
#xpack.monitoring.elasticsearch.ssl.cipher_suites: []
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.certificate: /path/to/file
#xpack.management.elasticsearch.ssl.key: /path/to/certificate_key_file
#xpack.management.elasticsearch.ssl.cipher_suites: []
#xpack.management.elasticsearch.ssl.verification_mode: full
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

# X-Pack GeoIP Database Management
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update
#xpack.geoip.downloader.enabled: true
#xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"

Pipeline.yml

- pipeline.id: kp-bulk-5577-5-5
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5577-5-5-filebeat.conf"

- pipeline.id: kp-bulk-5578-5-6
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5578-5-6-filebeat.conf"

- pipeline.id: kp-bulk-5579-5-7
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5579-5-7-filebeat.conf"

- pipeline.id: kp-bulk-5580-5-8
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5580-5-8-filebeat.conf"

- pipeline.id: kp-bulk-5581-5-141
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5581-5-141-filebeat.conf"

- pipeline.id: kp-bulk-5582-5-142
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5582-5-142-filebeat.conf"

- pipeline.id: kp-bulk-5583-5-143
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5583-5-143-filebeat.conf"

- pipeline.id: kp-bulk-5584-5-144
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5584-5-144-filebeat.conf"

- pipeline.id: kp-bulk-5585-5-146
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5585-5-146-filebeat.conf"

- pipeline.id: kp-bulk-5586-5-148
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5586-5-148-filebeat.conf"

- pipeline.id: kp-bulk-5587-5-149
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5587-5-149-filebeat.conf"

- pipeline.id: kp-bulk-5588-5-150
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5588-5-150-filebeat.conf"

- pipeline.id: kp-bulk-5589-5-151
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5589-5-151-filebeat.conf"

- pipeline.id: kp-bulk-5590-5-152
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5590-5-152-filebeat.conf"

- pipeline.id: kp-bulk-5591-5-153
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5591-5-153-filebeat.conf"

- pipeline.id: kp-bulk-5592-5-154
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5592-5-154-filebeat.conf"

- pipeline.id: kp-bulk-5593-5-155
  path.config: "/etc/logstash/conf.d/kairos-products-bulk_5593-5-155-filebeat.conf"

- pipeline.id: gp-5555-5-173
  path.config: "/etc/logstash/conf.d/gateway-products_5555-5173.conf"

- pipeline.id: misc-5607-5-190
  path.config: "/etc/logstash/conf.d/kairos-misc-filebeat_5607-5-190.conf"

- pipeline.id: kfp-5557-5-169
  path.config: "/etc/logstash/conf.d/kairos-file-processor_5557-5-169.conf"

- pipeline.id: kpp-5558-5-10
  path.config: "/etc/logstash/conf.d/kairos-product-preprocessor_5558-5-10.conf"

- pipeline.id: Agnet-5570-2-9
  path.config: "/etc/logstash/conf.d/agent-services_5570-2-9.conf"

- pipeline.id: reports
  path.config: "/etc/logstash/conf.d/kairos-reports*.conf"

- pipeline.id: realtime-5565-5-161
  path.config: "/etc/logstash/conf.d/kairos-realtime_5565-5-161.conf"

- pipeline.id: kpm-bulk-5602-5-11
  path.config: "/etc/logstash/conf.d/kairos-product-manager-bulk_5602-5-11.conf"

- pipeline.id: kpm-bulk-5603-5-27
  path.config: "/etc/logstash/conf.d/kairos-product-manager-bulk_5603-5-27.conf"

- pipeline.id: kpm-bulk-5604-5-159
  path.config: "/etc/logstash/conf.d/kairos-product-manager-bulk_5604-5-159.conf"

- pipeline.id: kpm-realtime-5573-5-4
  path.config: "/etc/logstash/conf.d/kairos-product-manager-realtime_5573-5-4.conf"

- pipeline.id: kp-realtime-5594-5-20
  path.config: "/etc/logstash/conf.d/kairos-products-realtime_5594-5-20.conf"

- pipeline.id: kp-realtime-5595-5-145
  path.config: "/etc/logstash/conf.d/kairos-products-realtime_5595-5-145.conf"

- pipeline.id: kp-realtime-5596-5-24
  path.config: "/etc/logstash/conf.d/kairos-products-realtime_5596-5-24.conf"

- pipeline.id: kp-realtime-5597-5-13
  path.config: "/etc/logstash/conf.d/kairos-products-realtime_5597-5-13.conf"

- pipeline.id: kp-realtime-5598-5-133
  path.config: "/etc/logstash/conf.d/kairos-products-realtime_5598-5-133.conf"

- pipeline.id: kp-realtime-5599-5-155
  path.config: "/etc/logstash/conf.d/kairos-products-realtime_5599-5-155.conf"

- pipeline.id: text-5600-5-187
  path.config: "/etc/logstash/conf.d/text-processor_5600-5-187.conf"

- pipeline.id: text-5601-5-174
  path.config: "/etc/logstash/conf.d/text-processor_5601-5-174.conf"

Hi.
I recommend for you install prometheus (if you dont have that).
Load my dashboard Logstash monitoring | Grafana Labs

What you must check

  1. You must run command
vmstat 1

If you NO have problem with IO you look something like this

procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 5  0 2916708 11383804   7764 7574392    0    0   717  1087    0    0 12  3 84  1  0
 3  0 2916708 11374416   7764 7584648    0    0  5180 13963 64565 71721  8  3 89  0  0
 4  0 2916708 11367956   7772 7591472    0    0   564 15718 63455 70713  7  2 90  0  0
 7  0 2916708 11348560   7772 7596736    0    0  6016 12128 65312 68429 12  3 85  0  0
 6  0 2916708 11357632   7772 7603072    0    0  2244 10907 63391 66601  9  3 88  0  0

Column

b - 0 - count process who wait disk
si/so - 0 (using swap)
wa - 0 
  1. You must check logstash memory info file: jvm.option
## JVM configuration

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms2g
-Xmx2g

If you give low memory it causes frequent garbage collections. You can check in on dashboard

Every GC using enough cpu time.
If you have too many GC you CPU will be using for GC... and you must add memory (in extreme cases, tune the jvm).

If you dont have prometheus+grafana you can using
jvm.option file uncomment

...
-XX:+PrintGCTimeStamps
...
# log GC status to a file with time stamps
# ensure the directory exists
-Xloggc:${LS_GC_LOG_FILE}

Dont forget set variable LS_GC_LOG_FILE
and then check file.

Comment this after check or you disk out of space=)

Or using documentation for check GC

VisualGC

P.S