Filebeat Event Publishing Delay to Kafka

From the logs, we can see that the logger timestamp indicates a noticeable time gap between event processing and event publishing to Kafka. Due to this delay, events are not being pushed to Kafka in real-time as expected.

Kafka Output Configuration (unified for both sources)

output.kafka:
hosts: ["${KAFKA_HOST_1}", "${KAFKA_HOST_2}", "${KAFKA_HOST_3}"] # Updated to use the SSL-enabled broker
topic: "${KAFKA_TOPIC}"
key: '%{[UUID]}'
codec.format:
string: '%{[message]}'
partition.hash:
reachable_only: true
required_acks: 1
compression: gzip
compression_level: 4
max_message_bytes: 10485760
version: "2.1.0"
client_id: "filebeat-prod"
bulk_max_size: 4096
bulk_flush_frequency: 1s
channel_buffer_size: 1024
keep_alive: 30s
max_retries: 5
backoff.init: 2s
backoff.max: 120s
timeout: 60s
broker_timeout: 20s

Enhanced Security Configuration

#sasl.mechanism: SCRAM-SHA-512
#username: "${KAFKA_USERNAME}"
#password: "${KAFKA_PASSWORD}"
security.protocol: PLAINTEXT
#ssl.enabled: true
#ssl.verification_mode: full
#ssl.certificate_authorities: ["/etc/filebeat/certs/ca-cert.pem"]
#ssl.verification_mode: none

Enhanced Logging Configuration

logging.level: debug # Reduced verbosity for cleaner logs
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644

Disable console logging to keep it clean

logging.to_stderr: true

Performance and Monitoring Configuration

Memory queue - will retry sending when Kafka comes back up

queue.mem:
events: 32768 # Increased capacity to hold more events when Kafka is down
flush.min_events: 2048
flush.timeout: 5s

Hi,

We'd like to understand more on this issue, could you help clarify a few things?

  • How large is the delay exactly (seconds, tens of seconds, minutes) because you mentioned noticeable delay so trying to correlate it properly?
  • Is queue.events.count growing continuously?
  • Do you see any retry, backoff, or publish failures in Filebeat logs?
  • As you are queueing the events on the Filebeat's end for Throughput and for safety fallback, what’s the CPU/IO resource usage on the Filebeat host during the delay?

These details will help determine whether this is batching behavior, Kafka backpressure, or resource constraints on the host.

You can also review the Filebeat internal metrics in the logs for deeper insight: https://www.elastic.co/docs/reference/beats/filebeat/understand-filebeat-logs. These metrics provide detailed visibility into Filebeat’s memory usage, CPU, input, processing, and output behavior, which should help identify where the delay is occurring.

For me, every message is taking 13 to 20 seconds
[npc@nj01npctst4153] Dev $ curl http://10.42.41.53:5066/stats?pretty

{

  "beat": {

    "cgroup": {

      "memory": {

        "id": "session-18347.scope",

        "mem": {

          "limit": {

            "bytes": 9223372036854771712

          },

          "usage": {

            "bytes": 270725120

          }

        }

      }

    },

    "cpu": {

      "system": {

        "ticks": 60,

        "time": {

          "ms": 60

        }

      },

      "total": {

        "ticks": 300,

        "time": {

          "ms": 300

        },

        "value": 300

      },

      "user": {

        "ticks": 240,

        "time": {

          "ms": 240

        }

      }

    },

    "handles": {

      "limit": {

        "hard": 262144,

        "soft": 262144

      },

      "open": 12

    },

    "info": {

      "ephemeral_id": "45c66e3e-1b95-40f5-8761-e802cc0aea1a",

      "name": "filebeat",

      "uptime": {

        "ms": 43157

      },

      "version": "8.17.0"

    },

    "memstats": {

      "gc_next": 38154024,

      "memory_alloc": 30649360,

      "memory_sys": 48715016,

      "memory_total": 71259320,

      "rss": 120840192

    },

    "runtime": {

      "goroutines": 37

    }

  },

  "filebeat": {

    "events": {

      "active": 0,

      "added": 6,

      "done": 6

    },

    "harvester": {

      "closed": 0,

      "open_files": 0,

      "running": 0,

      "skipped": 0,

      "started": 0

    },

    "input": {

      "log": {

        "files": {

          "renamed": 0,

          "truncated": 0

        }

      }

    }

  },

  "libbeat": {

    "config": {

      "module": {

        "running": 0,

        "starts": 0,

        "stops": 0

      },

      "reloads": 0,

      "scans": 0

    },

    "output": {

      "batches": {

        "split": 0

      },

      "events": {

        "acked": 0,

        "active": 0,

        "batches": 0,

        "dead_letter": 0,

        "dropped": 0,

        "duplicates": 0,

        "failed": 0,

        "toomany": 0,

        "total": 0

      },

      "read": {

        "bytes": 0,

        "errors": 0

      },

      "type": "kafka",

      "write": {

        "bytes": 0,

        "errors": 0,

        "latency": {

          "histogram": {

            "count": 0,

            "max": 0,

            "mean": 0,

            "median": 0,

            "min": 0,

            "p75": 0,

            "p95": 0,

            "p99": 0,

            "p999": 0,

            "stddev": 0

          }

        }

      }

    },

    "pipeline": {

      "clients": 2,

      "events": {

        "active": 0,

        "dropped": 0,

        "failed": 0,

        "filtered": 6,

        "published": 0,

        "retry": 0,

        "total": 6

      },

      "queue": {

        "acked": 0,

        "added": {

          "bytes": 0,

          "events": 0

        },

        "consumed": {

          "bytes": 0,

          "events": 0

        },

        "filled": {

          "bytes": 0,

          "events": 0,

          "pct": 0

        },

        "max_bytes": 0,

        "max_events": 16384,

        "removed": {

          "bytes": 0,

          "events": 0

        }

      }

    }

  },

  "registrar": {

    "states": {

      "cleanup": 0,

      "current": 7,

      "update": 6

    },

    "writes": {

      "fail": 0,

      "success": 1,

      "total": 1

    }

  },

  "system": {

    "cpu": {

      "cores": 4

    },

    "load": {

      "1": 0.11,

      "15": 0.26,

      "5": 0.21,

      "norm": {

        "1": 0.0275,

        "15": 0.065,

        "5": 0.0525

      }

    }

  }

Hi @Thirupathi

What does the source look like, what are you harvesting?

Have you isolated to the Kafka output.

Have you just tried the filebeat console output / file output?

To see if you see the same behavior?

Does not look like you are reading much?

Can you provide more context on how you are seeing this difference? Which time fields are you using? Can you share an screenshot of Kibana with the relevant date fields?

Can we schedule a call today to discuss this?

Hi @Thirupathi

Perhaps a little confusion.
This a public discussion forum not support, we don't schedule calls.

If you want help you will need to interact through the forum.

The best approach is to answer the questions that were asked in detail.

Sometimes issues are not resolved.

We are observing approximately a 20-second delay in message delivery from Filebeat to Kafka.

Filebeat is successfully sending messages to Kafka, but there is a noticeable delay before the messages are published to the Kafka topic. This delay is impacting near real-time log processing.

please check any improvment

filebeat.inputs:
  # Input 1: Kafka Error Logs (from ELK configuration)
  - type: log
    enabled: true
    paths:
      - /home/npc/logs/kafkaError.log
      - /home/npc/logs/kafkaError.log.*
    scan_frequency: 5s         # Check for new/rotated files every 5 seconds
    close_inactive: 10m        # Close files if no new log lines for 10 minutes
    #ignore_older: 24h          # Stop reading files older than 24 hours
    fields:
      log_source: "kafka_error"
    fields_under_root: true
    multiline.pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}'
    multiline.negate: true
    multiline.match: after

  # Input 2: Apache ModSecurity Audit Logs (from Kafka configuration)
  - type: log
    enabled: true
    paths:
      - /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log
      - /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log.*
    harvester_buffer_size: 1073741824
    message_max_bytes: 1073741824
    multiline.max_lines: 1073741824
    scan_frequency: 5s         # Check for new/rotated files every 5 seconds
    close_inactive: 10m        # Close files if no new log lines for 10 minutes
    #ignore_older: 24h          # Stop reading files older than 24 hours
    fields:
      log_source: "apache_modsec"
    fields_under_root: true
    multiline.pattern: '^--[a-z0-9]+-A--$'
    multiline.negate: true
    multiline.match: after
    # Exclude NPCWebGUI and NPCAdminGUI requests - don't send to Kafka
    exclude_lines: ['/NPCWebGUI/', '/NPCAdminGUI/']

processors:
  # Processor for Kafka Error Logs
  - if:
      equals:
        log_source: "kafka_error"
    then:
      - script:
          lang: javascript
          source: >
           function process(event) {
              try {
                var logMessage = event.Get("message");
                if (!logMessage || typeof logMessage !== 'string') {
                  event.Put("processing_error", "No valid message field found");
                  return;
                }

                // Extract JSON substring from the log line
                var start = logMessage.indexOf("{");
                var end = logMessage.lastIndexOf("}");
                if (start === -1 || end === -1 || end <= start) {
                  event.Put("processing_error", "No valid JSON structure found");
                  return;
                }

                var rawJson = logMessage.substring(start, end + 1);

                try {
                  var parsed = JSON.parse(rawJson);

                  // Check if this is the nested structure with message.value and message.key
                  if (parsed.message && parsed.message.value && parsed.message.key) {
                    // Extract the key for UUID
                    var key = parsed.message.key;

                    // Parse the inner JSON from the value field
                    try {
                      var innerJson = JSON.parse(parsed.message.value);
                      // Set the inner JSON as the message content
                      event.Put("message", JSON.stringify(innerJson));
                      event.Put("UUID", key);
                      event.Put("processing_status", "success_nested");
                    } catch (innerE) {
                      // If inner JSON parsing fails, use the value as-is
                      event.Put("message", parsed.message.value);
                      event.Put("UUID", key);
                      event.Put("processing_status", "success_simple");
                    }

                  } else if (parsed.message) {
                    // Handle direct message structure
                    event.Put("message", JSON.stringify(parsed.message));

                    // Try to extract key for UUID
                    var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
                    var key = keyMatch ? keyMatch[1] : "N/A";
                    event.Put("UUID", key);
                    event.Put("processing_status", "success_direct");

                  } else {
                    // Handle any other JSON structure
                    event.Put("message", JSON.stringify(parsed));

                    // Try to extract key for UUID
                    var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
                    var key = keyMatch ? keyMatch[1] : "N/A";
                    event.Put("UUID", key);
                    event.Put("processing_status", "success_generic");
                  }

                } catch (parseError) {
                  // Fallback: extract key and put full JSON string
                  var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
                  var key = keyMatch ? keyMatch[1] : "N/A";
                  event.Put("message", rawJson);
                  event.Put("UUID", key);
                  event.Put("processing_status", "fallback");
                  event.Put("processing_error", "JSON parse error: " + parseError.message);
                }

              } catch (generalError) {
                event.Put("processing_error", "General processing error: " + generalError.message);
                event.Put("processing_status", "error");
              }
            }
# Processor for Apache ModSecurity Logs
  - if:
      equals:
        log_source: "apache_modsec"
    then:
      - dissect:
          tokenizer: "%{log}"
          field: "message"
      - script:
          lang: javascript
          source: >
            function unescapeXml(str) {
             return str.replace(/&lt;/g, "<").replace(/&gt;/g, ">").replace(/&amp;/g, "&").replace(/&quot;/g, '"').replace(/&#39;/g, "'");
            }

              function extractSection(log, section) {
                var hashMatch = log.match(/--([a-z0-9]+)-A--/);
                if (!hashMatch) return "N/A";
                var hash = hashMatch[1];
                var startMarker = "--" + hash + "-" + section + "--";
                var startIdx = log.indexOf(startMarker);
                if (startIdx === -1) return "N/A";
                var afterStart = log.indexOf('\n', startIdx);
                if (afterStart === -1) return "N/A";
                afterStart++;
                var nextSection = log.slice(afterStart).match(/--[a-z0-9]+-[A-Z]--/);
                var endIdx = nextSection ? afterStart + nextSection.index : log.length;
                return log.substring(afterStart, endIdx).trim();
              }

              function process(event) {
                var logMessage = event.Get("message");
                var now = new Date();

                function pad(n, width) {
                  width = width || 2;
                  var s = n.toString();
                  return new Array(width - s.length + 1).join(0) + s;
                }

                function add(n) {
                  return (n < 10 ? '0' : '') + n;
                }

                var tmp_currentTimestamp = now.getFullYear().toString()
                  + add(now.getMonth() + 1)
                  + add(now.getDate())
                  + add(now.getHours())
                  + add(now.getMinutes())
                  + add(now.getSeconds());
                var randomNum = Math.floor(Math.random() * 10000000000);
                var randWithTime = randomNum + "_" + tmp_currentTimestamp;

                var httpsMatch = logMessage.match(/--[a-z0-9]+-F--\s*HTTP\/1\.1\s+([^\n\r]+)/i);
                var status = (httpsMatch && httpsMatch[1]) ? httpsMatch[1].substring(0, 3) : "N/A";
                if (status === "N/A") {
                var fSection = extractSection(logMessage, "F");
                if (fSection !== "N/A") {
                      var statusMatch = fSection.match(/HTTP\/[\d.]+\s+(\d{3})/i);
                      status = (statusMatch && statusMatch[1]) ? statusMatch[1] : "500";
                    }else {
                      status = "500";
                    }
                }

                var hostMatch = logMessage.match(/Host: ([^\n]+)/);
                var urlMatch = logMessage.match(/--[a-z0-9]+-B--\nPOST ([^ ]+) /);
                var aSectionMatch = logMessage.match(/--[a-z0-9]+-A--\n([\s\S]*?)--[a-z0-9]+-B--/i);
                var sourceIp = "N/A";
                if (aSectionMatch && aSectionMatch[1]) {
                  var ipMatch = aSectionMatch[1].match(/([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)/);
                  if (ipMatch) {
                    sourceIp = ipMatch[1];
                  }
                }

                // Extract request and response bodies using helper function
                var requestBody = extractSection(logMessage, "C");
                var responseBody = extractSection(logMessage, "E");

                // Extract NPCData block from request
                if (requestBody !== "N/A") {
                  var xmlMatch = requestBody.match(/<NPCData[\s\S]*?<\/NPCData>/i);
                  if (!xmlMatch) {
                    var unescaped = unescapeXml(requestBody);
                    xmlMatch = unescaped.match(/<NPCData[\s\S]*?<\/NPCData>/i);
                  }
                  if (xmlMatch) {
                    requestBody = xmlMatch[0];
                  }
                }
                event.Put("debug_NPCData", requestBody);

                // Unescape XML in response
                if (responseBody !== "N/A") {
                  responseBody = unescapeXml(responseBody);
                }

                // ENHANCED: Clean multipart form data headers from request body
                if (requestBody !== "N/A") {
                  // Remove Content-Disposition headers
                  requestBody = requestBody.replace(/^Content-Disposition:.*(\r?\n)?/gm, '').trim();
                  // Remove boundary lines
                  requestBody = requestBody.replace(/^[-]{2,}.*$/gm, '').trim();

                  // ENHANCED: Handle multiple Content-Disposition blocks (keep only first part)
                  var firstIdx = requestBody.indexOf('Content-Disposition:');
                  if (firstIdx !== -1) {
                    var secondIdx = requestBody.indexOf('Content-Disposition:', firstIdx + 1);
                    if (secondIdx !== -1) {
                      requestBody = requestBody.substring(0, secondIdx).trim();
                    }
                  }

                  // ENHANCED: Extract only JSON block if present (critical for clean data)
                  var jsonMatch = requestBody.match(/\{[\s\S]*\}/);
                  if (jsonMatch && jsonMatch[0]) {
                    requestBody = jsonMatch[0].trim();
                    // Remove everything from 'Content-Type:' onward if it somehow remains
                    var ctIdx = requestBody.indexOf('Content-Type:');
                    if (ctIdx !== -1) {
                      requestBody = requestBody.substring(0, ctIdx).trim();
                    }
                  }
                }

                function extractField(body, field) {
                  var m = body.match(new RegExp('<' + field + '>([^<]+)</' + field + '>', 'i')) ||
                    body.match(new RegExp('&lt;' + field + '&gt;([\\s\\S]*?)&lt;/' + field + '&gt;', 'i')) ||
                    body.match(new RegExp('"' + field + '"\\s*:\\s*"([^"]+)', 'i')) ||
                    body.match(new RegExp('"' + field + '"\\s*:\\s*\\[\\s*"([^"]+)"', 'i'));
                  return (m && m[1]) ? m[1].trim() : "N/A";
                }

                function getMessageID(body) {
                  var m = body.match(/<MessageID>([^<]+)<\/MessageID>/) ||
                    body.match(/&lt;MessageID&gt;([^&]+)&lt;\/MessageID&gt;/) ||
                    body.match(/"MessageID"\s*:\s*"([^"]+)"/);
                  return (m && m[1]) ? m[1] : "N/A";
                }

                var messageId = getMessageID(requestBody);
                if (messageId === "N/A") messageId = getMessageID(responseBody);
                if (messageId === "N/A") messageId = extractField(requestBody, "auditMessageID");
                if (messageId === "N/A") messageId = extractField(responseBody, "auditMessageID");

                var portingArea = extractField(requestBody, "PortingArea");
                if (portingArea === "N/A" || !portingArea) {
                  portingArea = extractField(responseBody, "PortingArea");
                }

                var portType = extractField(requestBody, "PortType");
                if (portType === "N/A" || !portType) {
                  portType = extractField(responseBody, "PortType");
                }

                var msgCreateTimeStamp = extractField(requestBody, "MsgCreateTimeStamp");
                if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                  msgCreateTimeStamp = extractField(responseBody, "MsgCreateTimeStamp");
                }
                if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                  msgCreateTimeStamp = extractField(responseBody, "timeStamp");
                }
                if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                  msgCreateTimeStamp = extractField(requestBody, "timeStamp");
                }

                var sender = extractField(requestBody, "Sender");
                if (sender === "N/A" || !sender) {
                  sender = extractField(responseBody, "Sender");
                }
                if (sender === "N/A" || !sender) {
                  sender = extractField(requestBody, "userId");
                }

                var contentTypeMatch = logMessage.match(/Content-Type:\s*([^\n\r]+)/);
                var contentType = contentTypeMatch ? contentTypeMatch[1].toLowerCase() : "N/A";

                var portId = extractField(requestBody, "portId");
                if (portId === "N/A") portId = extractField(responseBody, "portId");
                if (portId === "N/A") portId = extractField(requestBody, "TransactionID");
                if (portId === "N/A") {
                  portId = extractField(responseBody, "TransactionID");
                  if (portId === "N/A" && responseBody.indexOf("TransactionID") !== -1) {
                    event.Put("debug_transactionid_search", "TransactionID found but not extracted from response");
                  }
                }

                var number = extractField(requestBody, "Number");
                if ((!number || number === "N/A") && responseBody && responseBody !== "N/A") {
                  number = extractField(responseBody, "Number");
                }
                if (!number || number === "N/A") {
                  number = extractField(requestBody, "NumberFrom");
                  if ((!number || number === "N/A") && responseBody && responseBody !== "N/A") {
                    number = extractField(responseBody, "NumberFrom");
                  }
                }

                function replaceAttachedFiles(xmlContent) {
                  return xmlContent.replace(/<ws:attachedFiles>[^<]*<\/ws:attachedFiles>/g, "");
                }

                function maskJwtToken(resp) {
                  try {
                    return resp.replace(/"jwtToken"\s*:\s*"([^"]+)"/, function(match, token) {
                      if (token.length > 4) {
                        var masked = "*******";
                        return '"jwtToken":"' + masked + '"';
                      } else {
                        return '"jwtToken":"' + "*".repeat(token.length) + '"';
                      }
                    });
                  } catch (e) {
                    return resp;
                  }
                }

                function maskPassword(resp) {
                  try {
                    return resp.replace(/"password"\s*:\s*"([^"]+)"/, function(match, password) {
                      return '"password":"' + "*******" + '"';
                    });
                  } catch(e) {
                    return resp;
                  }
                }

                // Apply masking and cleanup to request body
                if (requestBody !== "N/A") {
                  requestBody = replaceAttachedFiles(requestBody);
                  requestBody = maskPassword(requestBody);
                }

                // Apply masking and cleanup to response body
                if (responseBody !== "N/A") {
                  responseBody = maskJwtToken(responseBody);
                  responseBody = maskPassword(responseBody);
                  responseBody = replaceAttachedFiles(responseBody);
                }

                var uuid = (portId !== "N/A" || number !== "N/A") ? portId || number : randWithTime;
                var request = (requestBody && requestBody.trim()) ? requestBody.trim() : "N/A";
                var response = (responseBody && responseBody.trim()) ? responseBody.trim() : "N/A";

                // Mask fileContent in JSON response
                if (response !== "N/A" && response.indexOf('"fileContent"') !== -1) {
                  response = response.replace(/("fileContent"\s*:\s*")([\s\S]*?)(")/, '$1N/A$3');
                }

                var hostValue = hostMatch && hostMatch[1] ? hostMatch[1].split(',')[0].trim() : "N/A";
                var url = (hostValue !== "N/A" && urlMatch && urlMatch[1]) ? ("https://" + hostValue + urlMatch[1]) : "N/A";

                var jsonObject = {
                  "header": {
                    "messageId": randWithTime,
                    "timeStamp": tmp_currentTimestamp.toString(),
                    "source": "npc-apache-server",
                    "version": "1.0",
                    "type": status === "200" ? "npc-apache-success" : "npc-apache-error"
                  },
                  "payload": {
                    "portingArea": portingArea,
                    "portType": portType,
                    "messageId": messageId,
                    "msgCreateTimeStamp": msgCreateTimeStamp,
                    "senderOperator": sender,
                    "httpsStatusCode": status,
                    "portId": portId,
                    "firstPhoneNumber": number,
                    "sourceIp": sourceIp,
                    "url": url,
                    "request": request,
                    "response": response
                  }
                };

                event.Put("message", JSON.stringify(jsonObject));
                event.Put("UUID", uuid);
              }

  

# Kafka Output Configuration (unified for both sources)
output.kafka:
  hosts: ["${KAFKA_HOST_1}", "${KAFKA_HOST_2}", "${KAFKA_HOST_3}"]  # Updated to use the SSL-enabled broker
  topic: "${KAFKA_TOPIC}"
  key: '%{[UUID]}'
  codec.format:
    string: '%{[message]}'
  partition.hash:
    reachable_only: true
  required_acks: 1
  compression: gzip
  compression_level: 4
  max_message_bytes: 10485760
  version: "2.1.0"
  client_id: "filebeat-prod"
  bulk_max_size: 4096
  bulk_flush_frequency: 1s
  channel_buffer_size: 1024
  keep_alive: 30s
  max_retries: 5
  backoff.init: 2s
  backoff.max: 120s
  timeout: 60s
  broker_timeout: 20s

  # Enhanced Security Configuration
  #sasl.mechanism: SCRAM-SHA-512
  #username: "${KAFKA_USERNAME}"
  #password: "${KAFKA_PASSWORD}"
  security.protocol: PLAINTEXT
  #ssl.enabled: true
  #ssl.verification_mode: full
  #ssl.certificate_authorities: ["/etc/filebeat/certs/ca-cert.pem"]
  #ssl.verification_mode: none
 

# Enhanced Logging Configuration
logging.level: debug  # Reduced verbosity for cleaner logs
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644


# Disable console logging to keep it clean
logging.to_stderr: true

# Performance and Monitoring Configuration

# Memory queue - will retry sending when Kafka comes back up
queue.mem:
  events: 32768  # Increased capacity to hold more events when Kafka is down
  flush.min_events: 2048
  flush.timeout: 5s

As asked, it is not clear how you are measuring this or where you are seeing this delay, you say that you have a delay between filebeat and kafka, but nothing you shared until now gives any indication of any issue, there is no evidence of any delay.

We may be able to help you, but you need to share context.

How are you measuring this? Where are you seeing this delay? Where your data will go after Kafka? Please provide context and evidence of the delay.

Hi
am seeing below error

"log.level":"debug","@timestamp":"2026-02-24T12:47:58.584+0400","log.logger":"reader_multiline","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/reader/multiline.(*patternReader).readNext","file.name":"multiline/pattern.go","file.line":170},"message":"Multiline event flushed because timeout reached.","service.name":"filebeat","ecs.version":"1.6.0"}
2026-02-24 12:48:00.248780077 +0400 +04 m=+17.349892097 write error: data size (18887644 bytes) is greater than the max file size (5242880 bytes)