Lag in Filebeat to Kibana

Hi ,

I am using Filebeat in Kubernetes env. Data flow is like below
Filebeat -> Logstash -> ElasticSearch -> Kibana

I am using 7.10.0 version of filebeat

I am finding lag in FIlebeat to Kibana. ES & Logstash & Filebeat health looks fine. Could you help me here

Filebeat config:

    filebeat.autodiscover:
     providers:
       - type: kubernetes
         scope: cluster
         node: ${NODE_NAME}
         hints.enabled: true
         add_resource_metadata:
           labels:
             enabled: true
         templates:
           - condition:
               equals:
                 kubernetes.labels.*****: "true"
             config:
               - type: container
                 multiline.pattern: '**Pattern***'
                 multiline.negate: false
                 multiline.match: after
                 paths:
                   - /var/log/containers/*-${data.kubernetes.container.id}.log
                 exclude_lines: ["^\\s+[\\-`('.|_]"]  # drop asciiart lines
                 fields:
                    cluster: {{ .Values.elk.clustername }}
                    container: ${data.kubernetes.container}

    processors:
      - add_cloud_metadata:
      - add_host_metadata:

    cloud.id: ${ELASTIC_CLOUD_ID}
    cloud.auth: ${ELASTIC_CLOUD_AUTH}
    monitoring:
      enabled: true
      elasticsearch:
        hosts: ['https://${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}']
        username: ${ELASTICSEARCH_USERNAME}
        password: ${ELASTICSEARCH_PASSWORD}
        ssl:
          enabled: true
          verification_mode: none
    output.logstash:
      hosts: ['${LOGSTASH_HOST}:${LOGSTASH_PORT}']

How exactly are you finding that there is a lag?

What is the configuration of your elasticsearch cluster? How many nodes, memory, heap, disk type?

Also, what is your Logstash pipeline?

@leandrojmp Lag Time not fixed when there are lot of log data(Working Hour) then it takes 10 minutes to 3 Hour. but if it is not peak hour then we don't see this lag.

ES Config:
2 Hot Nodes( 32 GB RAM & Heap Max Size 16GB) DIsk: EBS
1 Warm Node( 32 GB RAM & Heap Max Size 16GB) DIsk: EBS

Logstash Pipeline:
3 Logstash Pipelines with 4 GB Heap

@leandrojmp could you please help here. I have got few messages also stating that they are finding the same issue. Could you help us to resolve this

You didn't share your logstash pipeline, you need to share the logstash pipeline of the data ingestion that is lagging.

Without the logstash pipeline it is not possible to even now what is happening.

Also, EBS store normally is pretty slow, and depending on the number of events this could impact the performance of elasticsearch which could lead to a lag in logstash. It is recommended to provision IOPS when using ebs storage.

@leandrojmp Please find logstash pipeline below. Please let me know if you need any more details which can help in debugging.

    input {
      beats  { 
        host => "0.0.0.0"
        port => 5044  
      }
    }
    filter {
     if [log][file][path] =~ "/var/lib/docker/container" {
             drop { }
     }
     if [message] == ""{
             drop{}
     }
     if [agent][type] == "filebeat"{
       mutate {
               add_field => {     "log_level" => "UNDEFINED" }
       }
       if [message] =~ /.*Exception.*/{
               mutate{
                       replace => {"log_level" => "ERROR"}
               }
       }
       if [message] =~ /.*co\.elastic\.apm\.agent.*/{
               drop{}
       }
             if  [kubernetes][container][name] =~ /.*\-testCheck/{
         grok {
           id => "test-grok"
           match => {
           "message"=>'%{DATESTAMP:time}\s*-\s*%{LOGLEVEL:log_level} \[%{GREEDYDATA:xb3.trace.id}\s*,\s*%{GREEDYDATA:xb3.span.id}\s*,\s*%{GREEDYDATA:trace.id}\]\s*%{JAVACLASS:java_class}\s*:\s*%{GREEDYDATA:message}'
           }
           overwrite =>["message"]
           overwrite =>["log_level"]
         }
         grok {
           match => {
             "message"=>'%{GREEDYDATA:a}tenantId\=\'%{USERNAME:tenant}\'%{GREEDYDATA:b}'
           }
           match => {
             "message"=>'%{GREEDYDATA:a}realms\/%{USERNAME:tenant}\s*%{GREEDYDATA:b}'
           }
           match => {
             "message"=>'%{GREEDYDATA:a}tenants\/%{USERNAME:tenant}\s*%{GREEDYDATA:b}'
           }
         }
         mutate{
           remove_field => ["a", "b"]
         }
       }
       if [log_group] =~ /.*testgroup.*/{
               grok {
                       match => {
                               "message"=>'%{POSINT:event_EpochTimestamp},\s*%{HOSTPORT:client_address},\s*%{GREEDYDATA:source_node},\s*%{USERNAME:type},\s*\[%{GREEDYDATA:id1}\],\s*\[%{GREEDYDATA:id2}\],\s*\"%{GREEDYDATA:a}\)\)\s*%{USERNAME:log_level}\s*%{GREEDYDATA:b}\s*Host:\s*%{GREEDYDATA:host}\s*Connection:\s*%{GREEDYDATA:c}\",\s*%{GREEDYDATA:query_type}\=%{GREEDYDATA:message}'
                       }
                       match => {
                       "message"=>'%{POSINT:event_EpochTimestamp},\s*%{HOSTPORT:client_address},\s*%{GREEDYDATA:source_node},\s*%{USERNAME:type},\s*\[%{GREEDYDATA:id1}\],\s*\[%{GREEDYDATA:id2}\],\s*\"%{GREEDYDATA:a}\)\)\s*%{USERNAME:log_level}\s*\/query\?%{GREEDYDATA:query_type}\=%{GREEDYDATA:message}'
                       }
                       overwrite => ["message"]
                       overwrite => ["log_level"]
               }
               date {
                       match => [ "event_EpochTimestamp","UNIX_MS" ]
                       target => "event_Timestamp"
               }
               urldecode {
                       field => "message"
               }
               mutate{
                       gsub => [
                               "message", "\+", " "
                       ]
               }
               grok{
                       match => {"message"=>'%{GREEDYDATA:d}\s*(?i)graph\s+\<%{URI:e}\>\s+%{GREEDYDATA:f}'}
                       match => {"message" =>'%{GREEDYDATA:d}\s*(?i)with\s+\<%{URI:e}\>\s+%{GREEDYDATA:f}'}
               }
               if [e] =~ /.+/{
               ruby {
                       code => "event.set('tenant',event.get('e').split('/')[-1])"
               }
               }
               mutate{
                       remove_field => ["a", "b", "c","d", "e", "f","type", "id1", "id2"]
               }
       }
     }

    }

    output {
        elasticsearch {
            hosts => "https://${ELASTICSEARCH_HOST_PORT}"
            index => '%{[agent][type]}-%{+YYYY.MM.dd}-000001'
            cacert => "/certs/ca.crt"
            user => "${ELASTIC_USERNAME}"
            password => "${ELASTIC_PASSWORD}"
            #ilm_enabled => true
            #ilm_policy => "hotWarmDelete"
            #ilm_rollover_alias => "filebeat"
       }
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.