Pipeline Delay 7.9

Hello everyone, I have such work bench:
3 nodes of Elastic
1 node of Logstash
1 node of Kibana
1 Winlogbeat agent

The Winlogbeat agent ships logs from the Windows machine, that collecting logs from all of my company's computers (500 EPS). It then forwards logs to Logstash, which, after slight mutate, outputs it to Elasticsearch cluster. After all I can view it in Kibana.

The problem is: most (but not all) of this logs come with delay of 30 minutes or so.

Time setting is correct on all machines form such pipeline, they have suitable specifications (RAM utilization < 75%, CPU ~ 1% and so on).

All the others consumers from this Windows machine (SIEM systems) have no such lag (time delay) in log gathering, so problem is on ELK side. But I don't even know from where to start my troubleshoting.

Logs have no errors on all the machines, as I can see. Monitoring is not enabled.

Hi @AleksandrN

Yup 500 EPS should not have delay of 30 mins assuming that is the only data being ingested.

Let's start with a couple quick questions.

Is there other data being ingested if so how much EPS?

What are the RAM / CPU of data nodes.

What is the JVM heap set to for the data nodes

What is RAM / CPU of logstash what is the JVM heap for logstash.

Perhaps post your logstash config / pipeline.... If you do please use the formatting function above. </>

Also perhaps your winlogbeat.yml

Hi @stephenb
No, no other data is ingested.
8 Gb RAM (4 Gb heap) and 4 CPU, same for elastic and logstash.

logstash.yml

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
   pipeline.batch.size: 125
   pipeline.batch.delay: 50
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /var/lib/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
pipeline.workers: 8
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" will  automatically enable ordering if the 'pipeline.workers' setting
# is also set to '1'.
# "true" will enforce ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" will disable any extra processing necessary for preserving ordering.
#
pipeline.ordered: auto
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60) 
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ HTTP API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
# http.enabled: true
#
# By default, the HTTP API is bound to only the host's local loopback interface,
# ensuring that it is not accessible to the rest of the network. Because the API
# includes neither authentication nor authorization and has not been hardened or
# tested for use as a publicly-reachable API, binding to publicly accessible IPs
# should be avoided where possible.
#
http.host: 10.0.1.22
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
# http.port: 9600-9700
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
dead_letter_queue.enable: true

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
log.level: debug
path.logs: /var/log/logstash
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.verification_mode: certificate
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

#Additional settings
log.format: json

pipelines.yml

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"

conf.d single config file

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate_authorities => ["/etc/logstash/ca_dc.cer"]
    ssl_certificate => "/etc/logstash/mrsd-logstash.cer"
    ssl_key => "/etc/logstash/mrsd-logstash.key"
    ssl_verify_mode => "force_peer"
  }
}

filter {
        if ![winlog][event_data][RuleName] {
        mutate { add_field => { "[winlog][event_data][RuleName]" => "empty_name" } }
                }
        }

output {
  elasticsearch {
    hosts => ["https://mrsd-enode1.mrsd.test:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    ilm_enabled => true
    manage_template => false
    user => "logstash_internal"
    password => "${LPASS}"
    ssl => true
    cacert => '/etc/logstash/ca_dc.cer'
  }
}

winlogbeat.yml

###################### Winlogbeat Configuration Example ##########################

# This file is an example configuration file highlighting only the most common
# options. The winlogbeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/winlogbeat/index.html

#======================= Winlogbeat specific options ==========================

# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
#
# The supported keys are name (required), tags, fields, fields_under_root,
# forwarded, ignore_older, level, event_id, provider, and include_xml. Please
# visit the documentation for the complete details of each option.
# https://go.es.io/WinlogbeatConfig
winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
  - name: Security
  - name: System
  - name: ForwardedEvents

#==================== Elasticsearch template setting ==========================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
name: wef

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging


#============================== Dashboards =====================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:

#============================== Kibana =====================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
#setup.kibana:

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  #host: "localhost:5601"

  # Kibana Space ID
  # ID of the Kibana Space into which the dashboards should be loaded. By default,
  # the Default Space will be used.
  #space.id:

#============================= Elastic Cloud ==================================

# These settings simplify using winlogbeat with the Elastic Cloud (https://cloud.elastic.co/).

# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:

#================================ Outputs =====================================

# Configure what output to use when sending the data collected by the beat.

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
output.logstash:
  enabled: true
  # The Logstash hosts
  hosts: ["mrsd-logstash.mrsd.test:5044"]
  ssl.certificate_authorities: ["C:/Program Files/Winlogbeat/ca_dc.cer"]
  ssl.certificate: "C:/Program Files/Winlogbeat/wec-rmd.cer"
  ssl.key: "C:/Program Files/Winlogbeat/wec-rmd.key"

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

#================================ Processors =====================================

# Configure processors to enhance or manipulate events generated by the beat.

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
logging.level: debug
logging.to_files: true
logging.files:
  path: C:\Program Files\Winlogbeat\logs
  name: winlogbeat
  keepfiles: 7
  permissions: 0644

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

#============================== Xpack Monitoring ===============================
# winlogbeat can export internal metrics to a central Elasticsearch monitoring
# cluster.  This requires xpack monitoring to be enabled in Elasticsearch.  The
# reporting is disabled by default.

# Set to true to enable the monitoring reporter.
#xpack.monitoring.enabled: false

# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well. Any setting that is not set is
# automatically inherited from the Elasticsearch output configuration, so if you
# have the Elasticsearch output configured, you can simply uncomment the
# following line.
#xpack.monitoring.elasticsearch:

#xpack.monitoring:
#  enabled: true
#  elasticsearch:
#    hosts: ["10.0.1.24:9200"]
#================================= Migration ==================================

# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true

Are you sure there's not a time difference somewhere? Does the time in these logs match the system time, which matches the times on the other steps in the pipeline?

If that's ok, then maybe check the refresh_interval for the index.

Hi @AleksandrN

Thanks for the thoughtful we formatted replies.

1st glance your configs look pretty good on (I could be missing something)

A couple super minor items which I do not think are your issues.

1st super minor, but not an issues the pipeline params in logstash.yml should be left justified.

pipeline.batch.size: 125
pipeline.batch.delay: 50

2nd what is the jvm heap size for logstash (again I don't think this is the issue) but you could / should say set to 4GB - 6GB etc. I am not sure why we say no more than 8GB, you can scale logstash up much more than that. but for your machine 4-6 GB should be fine / plenty

Now... all that said... hmm yes... this seems odd.

As @warkolm beat me to it...

I supposed someone could have set refresh_interval to 30m and meant 30s

But my instincts also want to think there is a time zone / time setting issue on perhaps the windows hosts that (Yes I know you said you checked) but lets check a few things. When I have seen this behavior in the past especially the 30 mins was a incorrect Timezone setting on the originating hosts.... or it could be Elasticsearch as well.

Can you test this on a single windows hosts?

You can test this by running 1 host... have discover on for 1 hour and check the data flowing in... and look closely at the events/ logs...

Also, you could also do something as simple as setup metricbeat on the logstash host and point it output at the same logstash and see if the metricbeat data is delayed...

Also check those documents that are coming in on the regular interval after the majority of the events in your screen shot above and see what they are.

Lets us know what you find out.

Logstash heap is 4Gb also (out of 8Gb RAM total).

I can confirm that the all time zones are set correct.

Refresh interval is not set, so default behavior occurs.

This first events are regular windows events from a System channel.

Also I have my old cluster (elastic, kibana, logstash, all on single node). And when I redirect winlogbeat to it's logstash, problem persists.

In Winlogbeat logs it seems like agent just read events with a huge delay, despite new is presented also:

2020-09-21T17:21:58.824+0300 DEBUG [eventlog_detail] eventlog/wineventlog.go:346 WinEventLog[ForwardedEvents] XML=<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'><System><Provider Name='Microsoft-Windows-Sysmon' Guid='{5770385F-C22A-43E0-BF4C-06F5698FFBD9}'/><EventID>3</EventID><Version>5</Version><Level>4</Level><Task>3</Task><Opcode>0</Opcode><Keywords>0x8000000000000000</Keywords><TimeCreated SystemTime='2020-09-21T13:50:41.653057100Z'/><EventRecordID>36091106</EventRecordID><Correlation/><Execution ProcessID='1692' ThreadID='2776'/><Channel>Microsoft-Windows-Sysmon/Operational</Channel><Computer>hostname</Computer><Security UserID='S-1-5-18'/></System><EventData><Data Name='RuleName'></Data><Data Name='UtcTime'>2020-09-21 13:50:39.866</Data>...

Also if I'm collecting form a single host, no such lag persists.

What you are describing (1 beat is fine... many beats are slow... ) would seem to indicate a significant backpressure / bottleneck then.

When Look closer you image your total rates are much higher than 500 EPS looks like it is 10-15K EPS. How many winlogbeats are you running? 10-15K EPS is still not a huge amount so I am unclear why you are seeing this but I might trying scaling first.

You might want to look at this and this

Do you see any 429 Rejections?

Can you scale up logstash and ES nodes maybe try twice as big and see?

Plus I would install metricnbeat on the logstash and elasticsearch nodes so that you can monitor the stack

Best practice is a separate cluster for the monitoring stack but you can probably point to your existing... except if it is overwhelmed then it does not help to monitoring there.


40k per minute ~ 670 EPS
I use only single winlogbeat and it's my only event source

I will try to tune settings up a bit in a few days, when I solve other tasks and also will try to start use monitoring.

But now it seems like not performance issue - same problem for logstash+elastic on a single node and logstash + 3 elastics on a separate (so scaled out like 4x).

It seems like winlogbeat may be the problem, but since lag is not floating (not bound to log's bursts and pauses), more like config issue or bug.

Hahah yes Apologies I did the math wrong... too early in the AM. :slight_smile:

And yes if it is less that 1K EPS a very small cluster / single node can / should handle.

My 2GB Single Node Docker container on my Mac does 2.5K EPS easy.

Yeah Take a look at Winlogbeat. It can emit ALOT of data and have some nuances... 1 host doing 670 EPS is kinda a lot... in my opinion.

I would also suggest as you do... file another discuss with Winlogbeat in the Title and you should get more responses from experts on winlogbeat there.

I am a little confused whether you see the issue on a single host or not?

Good luck, let us know what you find out...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.