Filebeat Unable to Handle Single Large Event (~5GB)

We are facing an issue where Filebeat is not able to process or hold a single large event of approximately 5 GB .

Due to this limitation, the event is not being shipped successfully to Elasticsearch. This appears to be a Filebeat-side constraint on handling very large individual events rather than multiple smaller events.

We request you to please review this issue and advise on:

  • Supported maximum size for a single event in Filebeat
  • Possible configuration changes or alternatives to handle large events
  • Recommended approach to split or process such large events

Please let us know if any additional information or logs are required from our side.

Please let me know if any further information is required.

filebeat.inputs:
  # Input 1: Kafka Error Logs (from ELK configuration)
  - type: log
    enabled: true
    paths:
      - /home/npc/logs/kafkaError.log
      - /home/npc/logs/kafkaError.log.*
    scan_frequency: 5s         # Check for new/rotated files every 5 seconds
    close_inactive: 10m        # Close files if no new log lines for 10 minutes
    #ignore_older: 24h          # Stop reading files older than 24 hours
    fields:
      log_source: "kafka_error"
    fields_under_root: true
    multiline.pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}'
    multiline.negate: true
    multiline.match: after
 
  # Input 2: Apache ModSecurity Audit Logs (from Kafka configuration)
  - type: log
    enabled: true
    paths:
      - /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log
      - /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log.*
    harvester_buffer_size: 1073741824
    message_max_bytes: 1073741824
    multiline.max_lines: 1073741824
    scan_frequency: 5s         # Check for new/rotated files every 5 seconds
    close_inactive: 10m        # Close files if no new log lines for 10 minutes
    #ignore_older: 24h          # Stop reading files older than 24 hours
    fields:
      log_source: "apache_modsec"
    fields_under_root: true
    multiline.pattern: '^--[a-z0-9]+-A--$'
    multiline.negate: true
    multiline.match: after
    # Exclude NPCWebGUI and NPCAdminGUI requests - don't send to Kafka
    exclude_lines: ['/NPCWebGUI/', '/NPCAdminGUI/']
 
# Processor for Apache ModSecurity Logs
  - if:
      equals:
        log_source: "apache_modsec"
    then:
      - dissect:
          tokenizer: "%{log}"
          field: "message"
      - script:
          lang: javascript
          source: >
              // Extract status code like "200 OK" or others
              var httpsMatch = logMessage.match(/--[a-z0-9]+-F--\s*HTTP\/1\.1\s+([^\n\r]+)/i);
              var status = httpsMatch ? httpsMatch[1] : "";
              var hostMatch = logMessage.match(/Host: ([^\n]+)/);
              var urlMatch = logMessage.match(/--[a-z0-9]+-B--\nPOST ([^ ]+) /);
              var aSectionMatch = logMessage.match(/--[a-z0-9]+-A--\n([\s\S]*?)--[a-z0-9]+-B--/i);
             var sourceIp = "N/A";
              if (aSectionMatch && aSectionMatch[1]) {
             // Find the first IPv4 address in the --A-- section
             var ipMatch = aSectionMatch[1].match(/([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)/);
              if (ipMatch) {
             sourceIp = ipMatch[1];
             }
              }
              // Extract requestBody between --C-- and --F-- (Request body - XML/JSON)
              var requestMatch = logMessage.match(/--[a-z0-9]+-C--\n([\s\S]*?)\n--[a-z0-9]+-F--/);
              var requestBody = requestMatch ? requestMatch[1].trim() : "N/A";
 
              // Robust extraction of <NPCData> block
              if (requestBody !== "N/A") {
                var xmlMatch = requestBody.match(/<NPCData[\s\S]*?<\/NPCData>/i);
                if (!xmlMatch) {
                  var unescaped = unescapeXml(requestBody);
                  xmlMatch = unescaped.match(/<NPCData[\s\S]*?<\/NPCData>/i);
                }
                if (xmlMatch) {
                  requestBody = xmlMatch[0];
                }
              }
              event.Put("debug_NPCData", requestBody);
              // Extract responseBody between --E-- and --H-- (Response body - XML/JSON)
              var responseMatch = logMessage.match(/--[a-z0-9]+-E--\n([\s\S]*?)\n--[a-z0-9]+-H--/);
              var responseBody = responseMatch ? responseMatch[1].trim() : "N/A";
              if (responseBody !== "N/A") {
                responseBody = unescapeXml(responseBody);
              }
               // Remove multipart/form-data headers (e.g., Content-Disposition) if present
              if (requestBody !== "N/A") {
                // Remove lines like 'Content-Disposition: ...' and any boundary lines
                requestBody = requestBody.replace(/^Content-Disposition:.*(\r?\n)?/gm, '').trim();
                // Remove any boundary lines (e.g., ---------------------...)
                requestBody = requestBody.replace(/^[-]{2,}.*$/gm, '').trim();
              }
              // Extract fields from JSON, XML, or escaped XML
              function extractField(body, field) {
                var m = body.match(new RegExp('<' + field + '>([^<]+)</' + field + '>', 'i')) ||
                        body.match(new RegExp('&lt;' + field + '&gt;([\\s\\S]*?)&lt;/' + field + '&gt;', 'i')) ||
                        body.match(new RegExp('"' + field + '"\\s*:\\s*"([^"]+)"', 'i'));
                return (m && m[1]) ? m[1].trim() : "N/A";
              }
 
              // Extract MessageID (case-sensitive)
              function getMessageID(body) {
                var m = body.match(/<MessageID>([^<]+)<\/MessageID>/) ||
                        body.match(/&lt;MessageID&gt;([^&]+)&lt;\/MessageID&gt;/) ||
                        body.match(/"MessageID"\s*:\s*"([^"]+)"/);
                return (m && m[1]) ? m[1] : "N/A";
              }
 
              var messageId = getMessageID(requestBody);
              if (messageId === "N/A") messageId = getMessageID(responseBody);
              if (messageId === "N/A") messageId = extractField(requestBody, "auditMessageID");
              if (messageId === "N/A") messageId = extractField(responseBody, "auditMessageID");
             var portingArea = extractField(requestBody, "PortingArea");
              if (portingArea === "N/A" || !portingArea) {
                portingArea = extractField(responseBody, "PortingArea");
              }
             var portType = extractField(requestBody, "PortType");
              if (portType === "N/A" || !portType) {
                portType = extractField(responseBody, "PortType");
              }
               var msgCreateTimeStamp = extractField(requestBody, "MsgCreateTimeStamp");
              if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                msgCreateTimeStamp = extractField(responseBody, "MsgCreateTimeStamp");
              }
              if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                msgCreateTimeStamp = extractField(responseBody, "timeStamp");
              }
              if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                msgCreateTimeStamp = extractField(requestBody, "timeStamp");
              }
             var sender = extractField(requestBody, "Sender");
              if (sender === "N/A" || !sender) {
                sender = extractField(responseBody, "Sender");
              }
              if (sender === "N/A" || !sender) {
                sender = extractField(requestBody, "userId");
              }
              // Extract Content-Type to determine transport type (if still needed in other ways)
              var contentTypeMatch = logMessage.match(/Content-Type:\s*([^\n\r]+)/);
              var contentType = contentTypeMatch ? contentTypeMatch[1].toLowerCase() : "N/A";
 
              // Extract portId or TransactionID
              var portId = extractField(requestBody, "portId");
              if (portId === "N/A") portId = extractField(responseBody, "portId");
              if (portId === "N/A") portId = extractField(requestBody, "TransactionID");
              if (portId === "N/A") {
                portId = extractField(responseBody, "TransactionID");
                // Debug: log if TransactionID extraction fails
                if (portId === "N/A" && responseBody.indexOf("TransactionID") !== -1) {
                  event.Put("debug_transactionid_search", "TransactionID found but not extracted from response");
                }
              }
 
 
              function maskPassword(resp) {
                try {
                  return resp.replace(/"password"\s*:\s*"([^"]+)"/, function(match, password) {
                    return '"password":"' + "*******" + '"';
                  });
                } catch(e) {
                  return resp;
                }
              }
 
 
              // Replace the original message with the processed JSON object
              event.Put("message", JSON.stringify(jsonObject));
              event.Put("UUID", uuid);
            }
 
  
 
# Kafka Output Configuration (unified for both sources)
output.kafka:
  hosts: ["${KAFKA_HOST_1}", "${KAFKA_HOST_2}", "${KAFKA_HOST_3}"]  # Updated to use the SSL-enabled broker
  topic: "${KAFKA_TOPIC}"
  key: '%{[UUID]}'
  codec.format:
    string: '%{[message]}'
  partition.hash:
    reachable_only: true
  required_acks: 1
  compression: gzip
  compression_level: 4
  max_message_bytes: 1000000
  version: "2.1.0"
  client_id: "filebeat-prod"
  bulk_max_size: 4096
  bulk_flush_frequency: 1s
  channel_buffer_size: 1024
  keep_alive: 30s
  max_retries: 5
  backoff.init: 2s
  backoff.max: 120s
  timeout: 60s
  broker_timeout: 20s
 
  # Enhanced Security Configuration
  sasl.mechanism: SCRAM-SHA-512
  username: "${KAFKA_USERNAME}"
  password: "${KAFKA_PASSWORD}"
  security.protocol: SASL_SSL
  ssl.enabled: true
  #ssl.verification_mode: full
  ssl.certificate_authorities: ["/etc/filebeat/certs/ca-cert.pem"]
  #ssl.verification_mode: none

 
# Enhanced Logging Configuration
logging.level: debug  # Reduced verbosity for cleaner logs
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644
 
 
# Disable console logging to keep it clean
logging.to_stderr: true
 
# Performance and Monitoring Configuration
 
# Memory queue - will retry sending when Kafka comes back up
queue.mem:
  events: 32768  # Increased capacity to hold more events when Kafka is down
  flush.min_events: 2048
  flush.timeout: 5s

Thanks,
Thirupathi

HI @Thirupathi, there is one thing I'm a little bit confused about: are you talking about a single file that is 5Gb (thousands of lines) or a single line (event) with 5Gb (no line break \n within those 5Gb of data)?

Also could you format your config as a code block so it is easier to read/check? You can put the YAML within 3 back tics, like that:

```yaml
filebeat.inputs:
  - type: filestream
    id: example-1
    paths:
      - /foo/bar/*.log

# A yaml comment
output.discard:
  enabled: true
```