Slow Ingestion of Final Log Chunks (Filebeat + Logstash + Elasticsearch)

Setup

We are using the ELK stack to ingest logs from a file share. The logs are pre-existing files, and ingestion starts from scratch when the stack is brought up.

Our setup:

  • Elasticsearch

    • 6 GB heap
    • 6 replicas
  • Logstash

    • 3 GB heap
    • 2 replicas
  • Filebeat

    • 3 replicas
    • 10 GB disk queue
    • Some tuning in filebeat.yml

Problem

We noticed that ingesting the last portion of logs takes significantly longer compared to the beginning of the ingestion process.

We tested two scenarios:

  • Scenario 1:

    • 3.2 million logs total
    • 2.8 million ingested in 45 minutes
    • The remaining 400,000 logs took over 1 hour
  • Scenario 2:

    • 5.2 million logs total
    • 4.8 million ingested in 45 minutes
    • The remaining 400,000 logs again took over 1 hour

Here's a graph showing the total logs vs. processed logs over time. The slowdown is clear in the final stage of ingestion.


Question

What causes this slowdown when ingesting the last few hundred thousand logs? Could this be related to how Filebeat reads files or how Logstash handles remaining batches?

We would appreciate any insights into:

  • How Filebeat and Logstash handle file input near the end
  • What might be causing this performance drop
  • What we could tune or look into to fix or avoid this

Thanks in advance for your help.

Hi @RafaelXokito Welcome to the community.

What versions are you on?

Please share your filebeat and logstash configs.

Are you setting the _id field manually?

Do you mean 6 nodes or replica shards?

Curious what type of HW / CPU / RAM Storage?

To me there are 2 overall questions...

Overall Ingestion Speed ... 2.8M / 45 min / 60 sec = ~1.1K / Sec ingest... seems a bit slow...

And Why the Slow Down ... Are you manually setting the _id field? That can have exactly this effect...

We are currently using version 7.17.6 across all ELK components.

Yes, we define the document_id in the output section of logstash.conf

Actually, both - 6 nodes and 6 replicas, with one replica on each node.

logstash resources:

logstashJavaOpts: "-Xmx3g -Xms3g"

resources:
  requests:
    cpu: "1000m"
    memory: "1536Mi"
  limits:
    cpu: "1900m"
    memory: "5Gi"

filebeat resources:

  resources:
    requests:
      cpu: "100m"
      memory: "100Mi"
    limits:
      cpu: "1900m"
      memory: "5Gi"

Here are the configurations for Filebeat:

filebeat.inputs:
  - type: filestream
    id: project-logs-id
    paths:
      - /data/project/logs/**/*.log*
    close.reader.on_eof: true
queue.disk:
  path: "/usr/share/filebeat/diskqueue"
  max_size: 1GB

output.logstash:
  hosts: '${LOGSTASH_HOSTS}'
  loadbalance: "${LOAD_BALANCING:false}"

Here are the configurations for Logstash:

input {
    beats {
        port => 5044
    }
}

filter {
    grok {
        match => {"[log][file][path]" => "/data/project/logs/(?<logFolder>[^/]+)/.*\.log"}
    }

    grok {
        match => {"message" => "^(?:\x1b\[\d+m)*%{TIMESTAMP_ISO8601:entryTimestamp} %{LOGLEVEL:logLevel}\s+\[(?<hostname>[^]]+)\] \[(?<logger>[^]]+)\] \[(?<thread>[^]]+)\] \((?<transactionId>[^)]+)\) \((?<businessFlowId>[^)]*)\) operation=%{NOTSPACE:operation}, auditEventMessage=%{GREEDYDATA:auditEventMessage}"}
    }

    date {
        match => ["entryTimestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        target => "@timestamp"
        timezone => "UTC"
    }

    json {
        source => auditEventMessage
        target => "auditEvent"
    }

    mutate {
        remove_field => [ "host", "[agent][hostname]", "[agent][ephemeral_id]", "[agent][id]", "[agent][name]", "[agent][type]", "[agent][version]", "[ecs][version]" ]
    }

    ruby {
        code => 'fields = ["startedAt", "finishedAt", "transactionId", "elapsedTime", "databaseElapsedTime", "componentElapsedTime", "eventType", "componentType", "host",
                      "transactionStatus", "forcedRelease", "errorCode", "errorMessage", "isSecurityEvent", "outputRequest",
                      "outputResponse", "inputRequest", "inputResponse", "thread", "logger", "businessFlowId"]

            c = event.get("auditEvent")
            if c
                c.each do |index, value|
                    if not fields.include? index
                        event.set(index, value)
                        event.remove("[auditEvent][#{index}]")
                    end
                end
            end'
    }

    mutate {
        rename => { "logLevel" => "[auditEvent][logLevel]" }
        rename => { "hostname" => "[auditEvent][hostname]" }
        rename => { "logger" => "[auditEvent][logger]" }
        rename => { "thread" => "[auditEvent][thread]" }
        rename => { "operation" => "[auditEvent][operation]" }
        rename => { "entryTimestamp" => "[auditEvent][entryTimestamp]" }
        rename => { "type" => "[auditEvent][type]" }
        remove_field => [ "auditEventMessage", "message", "path", "[auditEvent][businessFlowId]", "transactionId" ]
    }
}

output {
    if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] and "_jsonparsefailure" not in [tags] and "_rubyexception" not in [tags] {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST:elasticsearch:9200}"]
            cacert => "${ELASTICSEARCH_CERT:/usr/share/logstash/config/certs/elastic-certificate.crt}"
            user => "${ELASTICSEARCH_USERNAME:default}"
            password => "${ELASTICSEARCH_PASSWORD:default}"
            ssl_certificate_verification =>"${CERTIFICATE_VERIFICATION:false}"
            index => "raw-%{logFolder}"
            document_id => "%{[auditEvent][transactionId]}"
            action => "update"
            doc_as_upsert => true
        }
    }
}

This means that each insert has to be treated as a potential update so Elasticsearch must first see if the document exists before writing it. This adds a lot of overhead and means indexing slows down as the size of the index grows and searching for each ID takes longer. The slower storage you have the larger the slowdown.

What type of storage are you using with Elasticsearch?

The most common reason people give for generating their own id is to avoid duplicates. This is (in my view) often a fear of an issue rather than a real issue. Obviously unintended duplicates are not ideal, but in many many use cases they would not be a catastrophe, or anything like, either. Also, it's often not 100% clear if the self-generated _id is even preventing duplicates.

Your issue here is looks very close to a demonstration of the downside of self-generated _id - progressively slower ingest.

You said that you have 3 replicas of filebeat, are each one of those filebeat instances reading different logs or are they reading the same logs and you are using a custom _id to avoid duplicates?

Also, you are using the disk queue for filebeat, this can impact in the overall performance as each event is read from disk, written in the disk in the queue, then again read from disk, so it basically doubles the disk io used, depending on the disk speed this can be an issue.

What are the disk type used by filebeat? And what are the Elasticsearch specs, specially the disk type? The overall indexing rate normally depends more on Elasticsearch than other tools.

Did you make any changes in logstash.yml or pipelines.yml? It is not clear how workers you are using or if you are using the default batch size, which can be pretty inneficient.

Thanks for all your answers.

Either with 3 or 1 replicas this behaviour still happens, below you can see a run with only 1 replica that I did before, with fewer logs (~3.2M).

I still need to investigate the use of memory instead of disk to hold the queues.

Regarding the manual document_id generation, after the removal as shown below, the ingest rate indeed increased as shown in the attached image, however I believe our problem is related to the amount of duplicate logs we have, for the same log files where we had nearly 6 million logs ingested without duplicates, with duplicates (after removing the manually generated document_id) we reached the 17.5 million. Note that the time taken to ingest all the logs is practically the same for both test runs.

output {
    if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] and "_jsonparsefailure" not in [tags] and "_rubyexception" not in [tags] {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST:elasticsearch:9200}"]
            cacert => "${ELASTICSEARCH_CERT:/usr/share/logstash/config/certs/elastic-certificate.crt}"
            user => "${ELASTICSEARCH_USERNAME:default}"
            password => "${ELASTICSEARCH_PASSWORD:default}"
            ssl_certificate_verification => "${CERTIFICATE_VERIFICATION:false}"
            index => "raw-%{logFolder}"
            action => "index"
        }
    }
}

You have duplicates with only one single Filebeat? You didn't say if the filebeat replicas are reading the same files or not.

If they are reading the same files, then you should have only one instance reading the files.

When using a file input you should have only one filebeat reading from it, not multiple instances.

Can you replicate this test using just one filebeat and not using the disk queue? The disk queue is expected to be slower, specially if the disk used by filebeat is not fast.

What about the other questions? What is the disk type where these logs are stored? Did you make any changes to logstash.yml or pipelines.yml ? What are the specs of your Elasticsearch cluster including the disk type?

Oh.

Well, that is indeed why it's the most common reason people use their own _id :slight_smile:

But it implies/suggests when you have 3 instances running in parallel, you were probably "ingesting" 3x more logs than necessary but just updating the same document 3x.

1 Like

Logs... the question is why so many duplicates in the first place

@leandrojmp is asking the KEY questions how are the logs being read and shipped...

Another important thing is, what is the average size of your events? Can you share a sample log line?

The log files have duplicated logs, we receive a ton of log files from a third party and we need to process them, unfortunately the logs contain duplicates (a lot of duplicates). Meaning that, even with a single filebeat replica I have duplicates.

The AKS nodes for filebeat are Standard_DS2_v2 with 128Gb Managed disk size.

You are completely right, I forgot to mention my pipelines.yml, shown bellow. The logstash.yml I already pasted it before.

- pipeline.id: project-pipeline
  pipeline.workers: 4
  pipeline.batch.delay: 100
  pipeline.ecs_compatibility: disabled
  pipeline.ordered: false
  path.config: /usr/share/logstash/pipeline/logstash.conf
Node pool Max pods p/ node Node Image Version OS disk size Nodes k8s Version Virtual network Mode Operating system Node size
projectelastic 15 AKSUbuntu-2204gen2containerd-202503.21.0 256GB Managed 3 1.29.8 DUMMY User Ubuntu Linux B8s_v2
projectelk 10 AKSUbuntu-2204gen2containerd-202503.21.0 128GB Managed 2 1.29.8 DUMMY User Ubuntu Linux DS2_v2
elkbase 10 AKSUbuntu-2204gen2containerd-202503.21.0 128GB Managed 3 1.29.8 DUMMY System Ubuntu Linux DS2_v2

Where 'projectelastic' is for elasticsearch, and 'projectelk' and 'elkbase' for the rest.

I can't give you a sample log. But each log entry has about 1300 characters. You can see the regex in the logstash grok filter I uploaded earlier.

You could maybe have shared this a bit earlier in the thread .... ?

There's other ways you could de-duplicate your data before ingest, could even be done in logtsash itself, see

But would add significant complexity.

Someone recently had similar issue and was using action => "create" the first time it saw docX, then getting (expected) errors on further create (not update) requests for same docX (same _id), and wanted to squash the error (or was it warning?) messages. This might be quicker.

As soon as I understood that duplicate logs would be the problem, I mentioned it.

I will look into these options and keep you informed of the results. Thanks for the suggestions though.

1 Like

is the threads I was half-recalling, which was sort of similar, but the other way round (lots of errors on failed creates, rather than asking ES to do un-necessary updates).

Good luck.

These are the machine types, but what is the disk type? In azure you can have different disk types like Standard HDD, Standard SSD, Premium SSD etc.

You need to check the disk type for the disk where the Elasticsearch data is being stored, and from where fielbeat is reading.

But, if it is not Standard HDD, I think that this would not be the problem.

Yeah, in this case you would need the custom _id on in Logstash, but you still should only use one filebeat instance since your source of data are files, adding more instances will not help anything, just make things worse, unless each instance consumes a different set of files.

I think you can consider the average size to be around 1.5 KB, so each 1 million events would amount to something close to 1.5 GB, if you have for example 6 million unique events, this would amount to something close to 10 GB, which should not take this much time to be indexed.

The tricky thing while troubleshooting ingestion issues is that there are multiple pieces that needs to be troubleshooted individually and the final ingestion rate will depend on the performance of each one of them.

One thing that I noticed and looks a little confusing is that you seem to be using a json filter on the auditEventMessage field with a target for the field auditEvent, then you have a ruby filter, which can be slow in some cases, to walk this json object and move some keys into the root of the document, with the exception of some pre-defined keys?

Why not do the inverse operation? Use the json filter to parse the field directly into root and then a mutate rename filter to rename the pre-defined keys.

In my experience the ruby filter can add some extra time in the processing of each event.

My suggestions for test would be:

  • First use only one filebeat instance with the memory queue.
  • Increase th pipeline.batch.size for this pipeline, start with something like 500 or 1000, the default value is 125.
  • If possible, change the index.refresh_interval setting for the destination index, the default is 1s, which in my experience can be a performance killer, in my case all my data has at least a 15s refresh interval.
  • Try to change the logstash pipeline to remove the ruby filter and rely on the mutate rename to achieve the same result.
1 Like

I did two tests with the Fingerprint plugin, the first with concatenate_all_fields, which reduced the ingest speed, but the deduplication didn't work because of an "offset" column that resulted in a different hash between duplicated events. Below you can see the fingerprint configuration, using document_id => "%{fingerprint}" inside output block, and also an image with the event ingestion rate.

fingerprint {
    concatenate_all_fields => "true"
    method => "MURMUR3"
    target => "fingerprint"
}

As for the second test, it was used with the concatenate_sources, selecting only the relevant fields for the uniqueness check. For this test I also replaced the previously mentioned ruby script by moving the auditEventMessage object to root and then renaming each attribute of interest within the auditEvent object. In this test I also changed the fingerprint method to SHA256. Below is a screenshot of the results.

json {
  source => "auditEventMessage"
  target => ""
}

mutate {
  rename => {
    "startedAt"           => "[auditEvent][startedAt]"
    "finishedAt"          => "[auditEvent][finishedAt]"
    "transactionId"       => "[auditEvent][transactionId]"
    "elapsedTime"         => "[auditEvent][elapsedTime]"
    "databaseElapsedTime" => "[auditEvent][databaseElapsedTime]"
    "componentElapsedTime"=> "[auditEvent][componentElapsedTime]"
    "eventType"           => "[auditEvent][eventType]"
    "componentType"       => "[auditEvent][componentType]"
    "host"                => "[auditEvent][host]"
    "transactionStatus"   => "[auditEvent][transactionStatus]"
    "forcedRelease"       => "[auditEvent][forcedRelease]"
    "errorCode"           => "[auditEvent][errorCode]"
    "errorMessage"        => "[auditEvent][errorMessage]"
    "isSecurityEvent"     => "[auditEvent][isSecurityEvent]"
    "outputRequest"       => "[auditEvent][outputRequest]"
    "outputResponse"      => "[auditEvent][outputResponse]"
    "inputRequest"        => "[auditEvent][inputRequest]"
    "inputResponse"       => "[auditEvent][inputResponse]"
    "thread"              => "[auditEvent][thread]"
    "logger"              => "[auditEvent][logger]"
    "businessFlowId"      => "[auditEvent][businessFlowId]"
    "logLevel"            => "[auditEvent][logLevel]"
    "hostname"            => "[auditEvent][hostname]"
    "operation"           => "[auditEvent][operation]"
    "entryTimestamp"      => "[auditEvent][entryTimestamp]"
    "type"                => "[auditEvent][type]"
  }
}
...
fingerprint {
  concatenate_sources => "true"
  source => ["auditEvent","transactionId"]
  method => "SHA256"
  target => "fingerprint"
}

I will keep this thread updated as I try to improve the ingestion rate. Next I will try using a memory queue and increasing the logstash pipeline attributes.

After applying some changes in logstash, namely:

  • Having no warning & errors in logstash logs
  • Filebeat queue storage from disk to memory
queue.mem:
  events: 32768
  flush.min_events: 8192
  flush.timeout: 3s
  • Elasticsearch index.refresh_interval schemas increased from default value 1s to 15s
  • logstash CPU increased to 2500m
  • logstash pipeline batch.size increased from default value 125 to 2000

With these changes, and compared to the results attached to the last previous reply in this discussion, in particular by fixing all errors and warnings in logstash conf, it was noticed that we were missing some events, not relevant events, but events that we were not seeing. We also noticed performance improvements after applying the described changes.

Looking for even more performance improvements, what else could I test?

Did you try alternatives to

            action => "update"
            doc_as_upsert => true

Specifically just doing action => index, and letting any future duplicates just (harmlessly) fail?

(obviously harmlessly if the later docs truly are just duplicates)