Logstash "block in start_workers" on shutdown

~All of our Logstash instances gets tons of warnings on shutdown.
Warning seems to tell us that something is blocked: "block in start_workers"

Our Logstash applications run with pipeline.yml:

pipeline.yml example
- pipeline.id: xferlog-pipeline
  path.config: "/<internal-path>/xferlog.conf"
- pipeline.id: syslog-pipeline
  path.config: "/<internal-path>/syslog.conf"
Stacktrace
[2024-11-04T15:01:33,487][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>6533, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6534, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6535, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6536, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6537, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6538, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6539, "name"=>"[xferlog-pipeline]<file", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.5/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>6482, "name"=>"[xferlog-pipeline]-pipeline-manager", "current_call"=>"
[...]/vendor/bundle/jruby/3.1.0/gems/thwait-0.2.0/lib/thwait.rb:112:in `pop'"}], 
["LogStash::Filters::GeoIP", {"add_tag"=>["geoip"], "source"=>"[ip]", "id"=>"8307b8bfd76f245feab6f6123316261140e43c8778f089da8e980049e7d9eaff", "target"=>"geoip"}]=>[{"thread_id"=>6509, "name"=>"[xferlog-pipeline]>worker0", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6510, "name"=>"[xferlog-pipeline]>worker1", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6511, "name"=>"[xferlog-pipeline]>worker2", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6512, "name"=>"[xferlog-pipeline]>worker3", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6513, "name"=>"[xferlog-pipeline]>worker4", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6514, "name"=>"[xferlog-pipeline]>worker5", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6515, "name"=>"[xferlog-pipeline]>worker6", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6516, "name"=>"[xferlog-pipeline]>worker7", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6517, "name"=>"[xferlog-pipeline]>worker8", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6518, "name"=>"[xferlog-pipeline]>worker9", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6519, "name"=>"[xferlog-pipeline]>worker10", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6520, "name"=>"[xferlog-pipeline]>worker11", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6521, "name"=>"[xferlog-pipeline]>worker12", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6522, "name"=>"[xferlog-pipeline]>worker13", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6523, "name"=>"[xferlog-pipeline]>worker14", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6524, "name"=>"[xferlog-pipeline]>worker15", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6525, "name"=>"[xferlog-pipeline]>worker16", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6526, "name"=>"[xferlog-pipeline]>worker17", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6527, "name"=>"[xferlog-pipeline]>worker18", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6528, "name"=>"[xferlog-pipeline]>worker19", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6529, "name"=>"[xferlog-pipeline]>worker20", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6530, "name"=>"[xferlog-pipeline]>worker21", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6531, "name"=>"[xferlog-pipeline]>worker22", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>6532, "name"=>"[xferlog-pipeline]>worker23", "current_call"=>"
[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}]}}
....
[2024-11-04T15:01:33,488][ERROR][org.logstash.execution.ShutdownWatcherExt] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
...
[2024-11-04T15:07:54,463][INFO ][logstash.javapipeline    ][xferlog-pipeline] Pipeline terminated {"pipeline.id"=>"xferlog-pipeline"}
[2024-11-04T15:07:54,537][INFO ][logstash.pipelinesregistry] Removed pipeline from 
registry successfully {:pipeline_id=>:"xferlog-pipeline"}
...
[2024-11-04T15:10:20,108][INFO ][logstash.javapipeline    ][syslog-pipeline] Pipeline terminated {"pipeline.id"=>"syslog-pipeline"}
[2024-11-04T15:10:20,368][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:"syslog-pipeline"}
[2024-11-04T15:10:20,378][INFO ][logstash.runner          ] Logstash shut down.
logstash.yml
# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
# path.data:
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: Enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" disables any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto
#
# Sets the pipeline's default value for `ecs_compatibility`, a setting that is
# available to plugins that implement an ECS Compatibility mode for use with
# the Elastic Common Schema.
# Possible values are:
# - disabled
# - v1
# - v8 (default)
# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a
# migrated pipeline continues to operate as it did before your upgrade, opt-OUT
# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting
# it here will set the default for _all_ pipelines, including new ones.
#
# pipeline.ecs_compatibility: v8
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60)
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
#
# api.enabled: true
#
# By default, the HTTP API is not secured and is therefore bound to only the
# host's loopback interface, ensuring that it is not accessible to the rest of
# the network.
# When secured with SSL and Basic Auth, the API is bound to _all_ interfaces
# unless configured otherwise.
#
api.http.host: 192.168.230.120
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
# api.http.port: 9600-9700
#
# The HTTP API includes a customizable "environment" value in its response,
# which can be configured here.
#
# api.environment: "production"
#
# The HTTP API can be secured with SSL (TLS). To do so, you will need to provide
# the path to a password-protected keystore in p12 or jks format, along with credentials.
#
# api.ssl.enabled: false
# api.ssl.keystore.path: /path/to/keystore.jks
# api.ssl.keystore.password: "y0uRp4$$w0rD"
#
# The availability of SSL/TLS protocols depends on the JVM version. Certain protocols are
# disabled by default and need to be enabled manually by changing `jdk.tls.disabledAlgorithms`
# in the $JDK_HOME/conf/security/java.security configuration file.
#
# api.ssl.supported_protocols: [TLSv1.2,TLSv1.3]
#
# The HTTP API can be configured to require authentication. Acceptable values are
#  - `none`:  no auth is required (default)
#  - `basic`: clients must authenticate with HTTP Basic auth, as configured
#             with `api.auth.basic.*` options below
# api.auth.type: none
#
# When configured with `api.auth.type` `basic`, you must provide the credentials
# that requests will be validated against. Usage of Environment or Keystore
# variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which
# resolves to the value stored in the keystore's `HTTP_PASS` variable if present
# or the same variable from the environment)
#
# api.auth.basic.username: "logstash-user"
# api.auth.basic.password: "s3cUreP4$$w0rD"
#
# When setting `api.auth.basic.password`, the password should meet
# the default password policy requirements.
# The default password policy requires non-empty minimum 8 char string that
# includes a digit, upper case letter and lower case letter.
# Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't
# meet the password policy requirements.
# The default is WARN. Setting to ERROR enforces stronger passwords (recommended).
#
# api.auth.basic.password_policy.mode: WARN
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
#
# If `queue.type: persisted`, the directory path where the pipeline data files will be stored.
# Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`.
# Default is path.data/queue.
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
#
# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
# expired the events could be automatically deleted from the DLQ.
# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
# to represent a five days interval.
# The available units are respectively d, h, m, s for day, hours, minutes and seconds.
# If not specified then the DLQ doesn't use any age policy for cleaning events.
#
# dead_letter_queue.retain.age: 1d

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
# path.logs:
# Lgq 240116
path.logs: /data2/appelk/logs
#
# ------------ Other Settings --------------
#
# Allow or block running Logstash as superuser (default: true)
# allow_superuser: false
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.certificate: /path/to/file
#xpack.monitoring.elasticsearch.ssl.key: /path/to/key
#xpack.monitoring.elasticsearch.ssl.verification_mode: full
#xpack.monitoring.elasticsearch.ssl.cipher_suites: []
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.certificate: /path/to/file
#xpack.management.elasticsearch.ssl.key: /path/to/certificate_key_file
#xpack.management.elasticsearch.ssl.cipher_suites: []
#xpack.management.elasticsearch.ssl.verification_mode: full
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

# X-Pack GeoIP plugin
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update
#xpack.geoip.download.endpoint: "https://geoip.elastic.co/v1/database"
#xpack.geoip.downloader.enabled: true
.conf example
input {
    file {
        type => "boron"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
    file {
        type => "ddpftp02"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
    file {
        type => "ddpftp03"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
    file {
        type => "thpftp02"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
    file {
        type => "thpftp03"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
    file {
        type => "thpftp04"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
    file {
        type => "thpftp05"
        path => "/<internal-path>/logs/syslog"
        start_position => "beginning"
        stat_interval => "600 second"
    }
}
filter {
    if "USER" in [message] {
        fingerprint {
            target => "[@metadata][fingerprint]"
            method => "SHA256"
            source => ["message"]
        }
        if "boron" in [message] {
            grok {
                match => {"message" => "%{MONTH:[@metadata][month]}\s+%{MONTHDAY:[@metadata][monthday]}\s+%{TIME:[@metadata][time]}\s+%{DATA:server}\s+%{DATA}\[%{NUMBER:pid:int}\]\s+%{DATA}\(%{DATA}\[%{IP:ip}\]\):\s%{DATA}\s%{DATA:user}:\s+%{GREEDYDATA:status}"}
                add_tag => ["log"]
                overwrite => ["message"]
            }
            mutate { add_field => {"logTime" => "%{+YYYY}-%{[@metadata][month]}-%{[@metadata][monthday]};%{[@metadata][time]}.000" }}
        } else {
            grok {
                match => {"message" => "%{TIMESTAMP_ISO8601:logTime}\s+%{DATA:server}\s+%{DATA}\[%{NUMBER:pid:int}\]\s+%{DATA}\(%{DATA}\[%{IP:ip}\]\):\s%{DATA}\s%{DATA:user}:\s+%{GREEDYDATA:status}"}
                add_tag => ["log"]
                overwrite => ["message"]
            }
        }

        if "_grokparsefailure" in [tags] {
            drop { }
        }
        date {
            match => ["logTime", "YYYY-MMM-dd;HH:mm:ss.SSS", "ISO8601"]
            timezone => "Europe/Oslo"
            target => ["logTime"]
        }
        if "192.168." not in [message] {
            geoip {
                source => "[ip]"
                target => "geoip"
                add_tag => ["geoip"]
            }
        }
    } else {
        drop { }
    }
}
output {
    elasticsearch {
        hosts => ["https://*****:9200"]
        index => "ftp-sys-log-%{+YYYY.MM.dd}"
        user => "logstash_internal"
        password => "*******"
        ssl_certificate_authorities => "/<internal-path>/****.pem"
        document_id => "%{[@metadata][fingerprint]}"
    }
}

At first I thought it was always related to LogStash::Filters::GeoIP, but I see different warnings all the time, such as: LogStash::Filters::Drop, LogStash::Filters::Grok.

Anyone know what is causing this, or how we can investigate this further? While this is only a warning, a shutdown can take 10+ minutes, which causes issues for our weekly restart job.

Have we given the pipelines too few workers/resources so there is a big queue when we try to shutdown?

Are you using persistent queues?

Can you share your logstash.yml ?

Updated the OP with logstash.yml
The only two lines I find that isn't commented out is:

path.logs: /data2/appelk/logs
api.http.host: 192.168.230.120

What is a little surprising is that X-Pack GeoIP plugin is commented out, but we use GeoIP to get maps in Kibana. But maybe we use another plugin and that isn't needed(I was not the person that configured Logstash to support GeoIP and Map in Kibana).

Our setup is fairly standard I would guess. But our some of our .conf files are becoming more and more complicated.

I will also update OP with the .conf file for the specific pipeline that I showcased in the stacktrace.

You have multiple file inputs that appear to be reading the same files, which could cause problems.

You also have multiple file inputs sharing the same sincedb, which certainly does cause problems. Specify a different value for sincedb_path on each file input.

It's the same filename but different folders. When I tried to censor our internal server structure I made it vague/indistinguishable, sorry. Path would be something like:

path => "/data/thpftp04/logs/syslog"
path => "/data/thpftp05/logs/syslog"

Do I need to specify a sincedb_path when the path is different but the filename is the same?

Also only a few of our pipeline configurations have multiple file inputs, we get block in start_workers warning regardless of the number of file inputs.

Every file input needs a unique sincedb_path. There is no support in the code to share the sincedb file.

You could try getting a stack trace from the JVM when the shutdown hangs to see exactly what the various threads are doing.

But do I have to set the since_db manually when the path is different? The log have one line for each different file input:

Log lines about sincedb_path
[syslog-pipeline] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/home/appelk/logstash-8.11.3/data-pipelines/plugins/inputs/file/.sincedb_a63658ad629b0c82675daa50f1c45924", :path=>["/data/boron/var/proftpd/logs/syslog"]}
[syslog-pipeline] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/home/appelk/logstash-8.11.3/data-pipelines/plugins/inputs/file/.sincedb_4f3336a44f780b438c4f9c41b921217b", :path=>["/data/ddpftp02/var/proftpd/logs/syslog"]}
[syslog-pipeline] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/home/appelk/logstash-8.11.3/data-pipelines/plugins/inputs/file/.sincedb_d24f0827a2c92ffcfcc28a970c8ae20d", :path=>["/data/thpftp05/var/proftpd/logs/syslog"]}
[syslog-pipeline] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/home/appelk/logstash-8.11.3/data-pipelines/plugins/inputs/file/.sincedb_13c4ffb278eb9d1ea3af3f2cac1dbd76", :path=>["/data/thpftp04/var/proftpd/logs/syslog"]}
[syslog-pipeline] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/home/appelk/logstash-8.11.3/data-pipelines/plugins/inputs/file/.sincedb_9d400acc9f1ec909fd8d9ce622a814e3", :path=>["/data/thpftp03/var/proftpd/logs/syslog"]}
[syslog-pipeline] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/home/appelk/logstash-8.11.3/data-pipelines/plugins/inputs/file/.sincedb_1ec77c7d93a67bbc7c74bac190267162", :path=>["/data/thpftp02/var/proftpd/logs/syslog"]}

Yes, you do.

Have you tried to decrease the value of stat_interval?

I'm not sure how exactly logstash will shutdown, which tasks it will cancel and which ones it will finish, but I'm assuming that this may have some influence.

From the documentation you have this:

Discovering new files and checking whether they have grown/or shrunk occurs in a loop. This loop will sleep for stat_interval seconds before looping again. However, if files have grown, the new content is read and lines are enqueued. Reading and enqueuing across all grown files can take time, especially if the pipeline is congested. So the overall loop time is a combination of the stat_interval and the file read time.

Interesting, so the theory is that the file has grown, but the file input is sleeping, but it know of changes, so it won't shutdown before it is woken up and done with one last parse?

Maybe that is the issue some of the times, but for this specific example the file is only written to every hour. And the stat_interval is 10 minutes. So most of the time it should not have any queued changes.

But most of our .conf files have 30sec stat_interval and is written to continually, and we get the same warnings.

Maybe the issue is related to how we shutdown? I see that the script my operations department has made just do:

~>kill {pid}

I'm a beginner here, so asking basic quests.

  • Do we need to manually set the since_db because we have multiple file inputs?
    • Or should we set since_db for every file input regardless of the number of file inputs?
  • Is the reason we need to set it so it can continue where it left of after restart?
    • I thought when the log said it created one for us, it was able to reuse it after restart?
  • Do you think this is related to the block in start_workers warning issue?

Yes. The file input maintains an in-memory sincedb and persists it to disk whereever sincedb_path tells it to. That allows it to continue where it left off after a restart.

If you only have one file input you can use the default value. If you have more than one you cannot, because if two inputs try to persists their sincedbs to the same location then one will overwrite the other's data.

I don't know if the code that writes the sincedb to disk can deadlock and cause things to hang. It seems unlikely, but in any case you need to use different paths for the sincedbs.

As I mentioned before, get a stack trace from the JVM and see what the various threads are waiting on.

They are running on the same logstash instance? If so it makes no difference.

When you asks logstash to shutdown it will start doing a graceful shutdown and stop all pipelines, which means empty all queues, stop all inputs etc.

I would suggest that you try to reduce the stat_interval of those that have 10 minutes to see if it improves.

We have 8 Logstash installations. All using pipelines. Where the smallest have 3 pipelines and the biggest have 14.

We get this blocking warning on all of them when we try to shutdown, in most situations.

It does not seem to cause any issues, only warnings, but since the time it takes for it to shutdown is variable, and the kill command is answered right away, we do not know when to start Logstash again.

Most of them have 30sec stat_interval and the file is written to continuously by a Java application logger. But we have some pipelines that does database queries, and some that does terminal commands to Wildfly servers for stats.

The configurations with 10min stat_interval is when the file is only written to every hour.

It feels like this issue is something basic we have forgotten, or maybe the way we start or shutdown the Logstash is wrong. Maybe we should/need to define pipeline.workers: X in the pipeline.yml or something?

I was thinking I could check the queue with localhost:9600/_node/stats/pipelines?pretty before shutting down next time.

So it seems that was the problem. I adjusted all stat_interval's to 10 sec and then the Logstash shutdown within 10-15 sec. So it seems like it is "blocked/waiting" for the input plugin to awake, regardless of if anything is in queue or not.

Before changing the .conf's I got the stats for the Logstash and nowhere it said anything was queued. But it still took 10 min to shutdown.

Find it strange that any stat_interval above the default pukes out blocking warnings in the log, but it isn't reported on all over the forum.

Thanks for all the help.

That seems reasonable. logstash will ask the file input to stop, and the input will ask the watcher to quit. The subscribe loop of the watcher will sleep for the stat_interval, and then check whether quit? is true.

1 Like