Filebeat OOM (Out Of Memory) Error

  • harvester_buffer_size: 5368709120
  • message_max_bytes: 5368709120
  • multiline.max_lines: 5368709120
    i configured above configuration in filebet after am getting below issue mp":"2025-11-30T05:04:30.546-0500","log.logger":"input.harvester","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/log.(*Harvester).newLogFileReader","file.name":"log/harvester.go","file.line":655},"message":"newLogFileReader with config.MaxBytes: 10485760","service.name":"filebeat","input_id":"dd0ea89d-927a-46bb-b468-803a1765fc3c","source_file":"/home/npc/logs/kafkaError.log","state_id":"native::21560394-64772","finished":false,"os_id":"21560394-64772","old_source":"/home/npc/logs/kafkaError.log","old_finished":true,"old_os_id":"21560394-64772","harvester_id":"ccfa40b1-56ab-4186-8e13-eea5c46d605d","ecs.version":"1.6.0"}
    fatal error: runtime: out of memory

runtime stack:
runtime.throw({0x5602667b2f0a?, 0x203050?})
runtime/panic.go:1023 +0x5e fp=0x7f9104ff8c08 sp=0x7f9104ff8bd8 pc=0x56026220bede
runtime.sysMapOS(0xc002800000, 0x140000000)
runtime/mem_linux.go:167 +0x11b fp=0x7f9104ff8c48 sp=0x7f9104ff8c08 pc=0x5602621e873b
runtime.sysMap(0xc002800000, 0x140000000, 0x56026af11128?)
runtime/mem.go:155 +0x34 fp=0x7f9104ff8c68 sp=0x7f9104ff8c48 pc=0x5602621e8174
runtime.(*mheap).grow(0x56026af00f20, 0xa0000?)
runtime/mheap.go:1534 +0x236 fp=0x7f9104ff8cd8 sp=0x7f9104ff8c68 pc=0x5602621fbad6
runtime.(*mheap).allocSpan(0x56026af00f20, 0xa0000, 0x0, 0x1)
runtime/mheap.go:1246 +0x1b0 fp=0x7f9104ff8d78 sp=0x7f9104ff8cd8 pc=0x5602621fb1b0
runtime.(*mheap).alloc.func1()
runtime/mheap.go:964 +0x5c fp=0x7f9104ff8dc0 sp=0x7f9104ff8d78 pc=0x5602621fac5c
runtime.systemstack(0x800000)
runtime/asm_amd64.s:509 +0x47 fp=0x7f9104ff8dd0 sp=0x7f9104ff8dc0 pc=0x560262247907

goroutine 133 gp=0xc000ece540 m=9 mp=0xc00113a808 [running]:
runtime.systemstack_switch()
runtime/asm_amd64.s:474 +0x8 fp=0xc000087130 sp=0xc000087120 pc=0x5602622478a8
runtime.(*mheap).alloc(0x140000000?, 0xa0000?, 0xcf?)
runtime/mheap.go:958 +0x5b fp=0xc000087178 sp=0xc000087130 pc=0x5602621fabbb
runtime.(*mcache).allocLarge(0x7f9154536108?, 0x140000000, 0x1)
runtime/mcache.go:234 +0x87 fp=0xc0000871c8 sp=0xc000087178 pc=0x5602621e7227
runtime.mallocgc(0x140000000, 0x560267403320, 0x1)
runtime/malloc.go:1165 +0x597 fp=0xc000087250 sp=0xc0000871c8 pc=0x5602621dd117
runtime.makeslice(0x560267f96120?, 0xc001b806f0?, 0x0?)
runtime/slice.go:107 +0x49 fp=0xc000087278 sp=0xc000087250 pc=0x5602622271a9
github.com/elastic/beats/v7/libbeat/reader/readfile.NewLineReader({0x560267f8cb48, 0xc000c7af00}, {{0x7f910c5f8518, 0x56026af57f80}, 0x140000000, 0x1, 0x2800000, 0x0})
github.com/elastic/beats/v7/libbeat/reader/readfile/line.go:79 +0x19b fp=0xc000087348 sp=0xc000087278 pc=0x560264e4215b
github.com/elastic/beats/v7/libbeat/reader/readfile.NewEncodeReader(...)
github.com/elastic/beats/v7/libbeat/reader/readfile/encode.go:54
github.com/elastic/beats/v7/filebeat/input/log.(*Harvester).newLogFileReader(0xc00106f608)
github.com/elastic/beats/v7/filebeat/input/log/harvester.go:676 +0x1e5 fp=0xc000087450 sp=0xc000087348 pc=0x560264e52285
github.com/elastic/beats/v7/filebeat/input/log.(*Harvester).Setup(0xc00106f608)
github.com/elastic/beats/v7/filebeat/input/log/harvester.go:197 +0x85 fp=0xc0000874f0 sp=0xc000087450 pc=0x560264e4f005
github.com/elastic/beats/v7/filebeat/input/log.(*Input).startHarvester(0xc00101cf00, 0xc001b982e0, {{0xc001aad9f8, 0x15}, {0x0, 0x0}, 0x0, {0x560267fe3288, 0xc0012d9ad0}, {0xc0012f4800, ...}, ...}, ...)
github.com/elastic/beats/v7/filebeat/input/log/input.go:756 +0x125 fp=0xc0000875d8 sp=0xc0000874f0 pc=0x560264e57e45
github.com/elastic/beats/v7/filebeat/input/log.(*Input).scan(0xc00101cf00)
github.com/elastic/beats/v7/filebeat/input/log/input.go:563 +0x5b6 fp=0xc000087b50 sp=0xc0000875d8 pc=0x560264e560b6
github.com/elastic/beats/v7/filebeat/input/log.(*Input).Run(0xc00101cf00)
github.com/elastic/beats/v7/filebeat/input/log/input.go:251 +0x12e fp=0xc000087f28 sp=0xc000087b50 pc=0x560264e53c0e
github.com/elastic/beats/v7/filebeat/input.(*Runner).Run(0xc000e25dc0)
github.com/elastic/beats/v7/filebeat/input/input.go:136 +0x28 fp=0xc000087fa0 sp=0xc000087f28 pc=0x560264e1d5c8
github.com/elastic/beats/v7/filebeat/input.(*Runner).Start.func1()
github.com/elastic/beats/v7/filebeat/input/input.go:129 +0x5b fp=0xc000087fe0 sp=0xc000087fa0 pc=0x560264e1d49b
runtime.goexit({})
runtime/asm_amd64.s:1695 +0x1 fp=0xc000087fe8 sp=0xc000087fe0 pc=0x5602622498e1
created by github.com/elastic/beats/v7/filebeat/input.(*Runner).Start in goroutine 1
github.com/elastic/beats/v7/filebeat/input/input.go:122 +0xf5

goroutine 1 gp=0xc0000061c0 m=nil [chan receive]:
runtime.gopark(0x7f910c216688?, 0x18?, 0x8?, 0x61?, 0x18?)
runtime/proc.go:402 +0xce fp=0xc00209eae8 sp=0xc00209eac8 pc=0x56026220faee
runtime.chanrecv(0xc00010f020, 0x0, 0x1)
runtime/chan.go:583 +0x3bf fp=0xc00209eb60 sp=0xc00209eae8 pc=0x5602621d58ff
runtime.chanrecv1(0xc00012f680?, 0xc001b98230?)
runtime/chan.go:442 +0x12 fp=0xc00209eb88 sp=0xc00209eb60 pc=0x5602621d5512
github.com/elastic/beats/v7/filebeat/beater.(*signalWait).Wait(...)
github.com/elastic/beats/v7/filebeat/beater/signalwait.go:44
github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Run(0xc001186e00, 0xc001005008)
github.com/elastic/beats/v7/filebeat/beater/filebeat.go:447 +0x1b0a fp=0xc00209f4d8 sp=0xc00209eb88 pc=0x560264eeed6a
github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).launch(0xc001005008, {{0x560266770bb2, 0x8}, {0x560266770bb2, 0x8}, {0x0, 0x0}, 0x1, 0x1, {{0x0, ...}, ...}, ...}, ...)
github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:772 +0x983 fp=0xc00209fac0 sp=0xc00209f4d8 pc=0x560264dee0c3
github.com/elastic/beats/v7/libbeat/cmd/instance.Run.func1(0xc001c3fbf0, 0xc000f30290)
github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:225 +0x145 fp=0xc00209fbd0 sp=0xc00209fac0 pc=0x560264debfe5
github.com/elastic/beats/v7/libbeat/cmd/instance.Run({{0x560266770bb2, 0x8}, {0x560266770bb2, 0x8}, {0x0, 0x0}, 0x1, 0x1, {{0x0, 0x0}, ...}, ...}, ...)
github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:226 +0x1b fp=0xc00209fbf0 sp=0xc00209fbd0 pc=0x560264debe5b
github.com/elastic/beats/v7/libbeat/cmd.genRunCmd.func1(0xc0011c6600?, {0x560266768247?, 0x4?, 0x56026676824b?})
github.com/elastic/beats/v7/libbeat/cmd/run.go:37 +0x58 fp=0xc00209fcc0 sp=0xc00209fbf0 pc=0x560264e03058
github.com/spf13/cobra.(*Command).execute(0xc000d73c08, {0xc000142010, 0xb, 0xb})
github.com/spf13/cobra@v1.8.1/command.go:989 +0xab1 fp=0xc00209fe48 sp=0xc00209fcc0 pc=0x560262b3ded1
github.com/spf13/cobra.(*Command).ExecuteC(0xc000d73c08)
github.com/spf13/cobra@v1.8.1/command.go:1117 +0x3ff fp=0xc00209ff20 sp=0xc00209fe48 pc=0x560262b3e81f
github.com/spf13/cobra.(*Command).Execute(0xc0000061c0?)
github.com/spf13/cobra@v1.8.1/command.go:1041 +0x13 fp=0xc00209ff38 sp=0xc00209ff20 pc=0x560262b3e373
main.main()
github.com/elastic/beats/v7/x-pack/filebeat/main.go:23 +0x1a fp=0xc00209ff50 sp=0xc00209ff38 pc=0x560266764d1a
runtime.main()
runtime/proc.go:271 +0x29d fp=0xc00209ffe0 sp=0xc00209ff50 pc=0x56026220f69d
runtime.goexit({})
runtime/asm_amd64.s:1695 +0x1 fp=0xc00209ffe8 sp=0xc00209ffe0 pc=0x5602622498e1

goroutine 2 gp=0xc000006c40 m=nil [force gc (idle)]:
runtime.gopark(0x0?, 0x0?, 0x0?, 0x0?, 0x0?)

Hello,

As in your previous post you need to provide context, just throwing a stack error is not helpful.

Please describe what you are trying to do and share your entire filebeat.yml, without it is not possible to provide any insight.

Also, share the specs of your machine.

`type or paste code here`
```filebeat.inputs:
  # Input 1: Kafka Error Logs (from ELK configuration)
  - type: log
    enabled: true
    paths:
      - /home/npc/logs/kafkaError.log
      - /home/npc/logs/kafkaError.log.*
    scan_frequency: 5s         # Check for new/rotated files every 5 seconds
    close_inactive: 10m        # Close files if no new log lines for 10 minutes
    ignore_older: 24h          # Stop reading files older than 24 hours
    fields:
      log_source: "kafka_error"
    fields_under_root: true
    multiline.pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}'
    multiline.negate: true
    multiline.match: after

  # Input 2: Apache ModSecurity Audit Logs (from Kafka configuration)
  - type: log
    enabled: true
    paths:
      - /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log
      - /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log.*
    harvester_buffer_size: 5368709120
    message_max_bytes: 5368709120
    multiline.max_lines: 5368709120
    scan_frequency: 5s         # Check for new/rotated files every 5 seconds
    close_inactive: 10m        # Close files if no new log lines for 10 minutes
    ignore_older: 24h          # Stop reading files older than 24 hours
    fields:
      log_source: "apache_modsec"
    fields_under_root: true
    multiline.pattern: '^--[a-z0-9]+-A--$'
    multiline.negate: true
    multiline.match: after
    # Exclude NPCWebGUI and NPCAdminGUI requests - don't send to Kafka
    exclude_lines: ['/NPCWebGUI/', '/NPCAdminGUI/']

processors:
  # Processor for Kafka Error Logs
  - if:
      equals:
        log_source: "kafka_error"
    then:
      - script:
          lang: javascript
          source: >
           function process(event) {
              try {
                var logMessage = event.Get("message");
                if (!logMessage || typeof logMessage !== 'string') {
                  event.Put("processing_error", "No valid message field found");
                  return;
                }

                // Extract JSON substring from the log line
                var start = logMessage.indexOf("{");
                var end = logMessage.lastIndexOf("}");
                if (start === -1 || end === -1 || end <= start) {
                  event.Put("processing_error", "No valid JSON structure found");
                  return;
                }

                var rawJson = logMessage.substring(start, end + 1);

                try {
                  var parsed = JSON.parse(rawJson);

                  // Check if this is the nested structure with message.value and message.key
                  if (parsed.message && parsed.message.value && parsed.message.key) {
                    // Extract the key for UUID
                    var key = parsed.message.key;

                    // Parse the inner JSON from the value field
                    try {
                      var innerJson = JSON.parse(parsed.message.value);
                      // Set the inner JSON as the message content
                      event.Put("message", JSON.stringify(innerJson));
                      event.Put("UUID", key);
                      event.Put("processing_status", "success_nested");
                    } catch (innerE) {
                      // If inner JSON parsing fails, use the value as-is
                      event.Put("message", parsed.message.value);
                      event.Put("UUID", key);
                      event.Put("processing_status", "success_simple");
                    }

                  } else if (parsed.message) {
                    // Handle direct message structure
                    event.Put("message", JSON.stringify(parsed.message));

                    // Try to extract key for UUID
                    var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
                    var key = keyMatch ? keyMatch[1] : "N/A";
                    event.Put("UUID", key);
                    event.Put("processing_status", "success_direct");

                  } else {
                    // Handle any other JSON structure
                    event.Put("message", JSON.stringify(parsed));

                    // Try to extract key for UUID
                    var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
                    var key = keyMatch ? keyMatch[1] : "N/A";
                    event.Put("UUID", key);
                    event.Put("processing_status", "success_generic");
                  }

                } catch (parseError) {
                  // Fallback: extract key and put full JSON string
                  var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
                  var key = keyMatch ? keyMatch[1] : "N/A";
                  event.Put("message", rawJson);
                  event.Put("UUID", key);
                  event.Put("processing_status", "fallback");
                  event.Put("processing_error", "JSON parse error: " + parseError.message);
                }

              } catch (generalError) {
                event.Put("processing_error", "General processing error: " + generalError.message);
                event.Put("processing_status", "error");
              }
            }
# Processor for Apache ModSecurity Logs
  - if:
      equals:
        log_source: "apache_modsec"
    then:
      - dissect:
          tokenizer: "%{log}"
          field: "message"
      - script:
          lang: javascript
          source: >
            function unescapeXml(str) {
              return str
                .replace(/&lt;/g, "<")
                .replace(/&gt;/g, ">")
                .replace(/&amp;/g, "&")
                .replace(/&quot;/g, '"')
                .replace(/&#39;/g, "'");
                }
            function process(event) {
              // Get the log message from the event
              var logMessage = event.Get("message");
              var now = new Date();

              function pad(n) {
                return (n < 10 ? '0' : '') + n;
              }

              var currentTimestamp = now.getFullYear().toString()
                                    + pad(now.getMonth() + 1)   // months are 0-based
                                    + pad(now.getDate())
                                    + pad(now.getHours())
                                    + pad(now.getMinutes())
                                    + pad(now.getSeconds());
              var randomNum = Math.floor(Math.random() * 10000000000); // 10-digit max
              var randWithTime = randomNum + "_" + currentTimestamp;

              // Extract status code like "200 OK" or others
              var httpsMatch = logMessage.match(/--[a-z0-9]+-F--\s*HTTP\/1\.1\s+([^\n\r]+)/i);
              var status = httpsMatch ? httpsMatch[1] : "";
              var hostMatch = logMessage.match(/Host: ([^\n]+)/);
              var urlMatch = logMessage.match(/--[a-z0-9]+-B--\nPOST ([^ ]+) /);
              var aSectionMatch = logMessage.match(/--[a-z0-9]+-A--\n([\s\S]*?)--[a-z0-9]+-B--/i);
             var sourceIp = "N/A";
              if (aSectionMatch && aSectionMatch[1]) {
             // Find the first IPv4 address in the --A-- section
             var ipMatch = aSectionMatch[1].match(/([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)/);
              if (ipMatch) {
             sourceIp = ipMatch[1];
             }
              }
              // Extract requestBody between --C-- and --F-- (Request body - XML/JSON)
              var requestMatch = logMessage.match(/--[a-z0-9]+-C--\n([\s\S]*?)\n--[a-z0-9]+-F--/);
              var requestBody = requestMatch ? requestMatch[1].trim() : "N/A";

              // Robust extraction of <NPCData> block
              if (requestBody !== "N/A") {
                var xmlMatch = requestBody.match(/<NPCData[\s\S]*?<\/NPCData>/i);
                if (!xmlMatch) {
                  var unescaped = unescapeXml(requestBody);
                  xmlMatch = unescaped.match(/<NPCData[\s\S]*?<\/NPCData>/i);
                }
                if (xmlMatch) {
                  requestBody = xmlMatch[0];
                }
              }
              event.Put("debug_NPCData", requestBody);
              // Extract responseBody between --E-- and --H-- (Response body - XML/JSON)
              var responseMatch = logMessage.match(/--[a-z0-9]+-E--\n([\s\S]*?)\n--[a-z0-9]+-H--/);
              var responseBody = responseMatch ? responseMatch[1].trim() : "N/A";
              if (responseBody !== "N/A") {
                responseBody = unescapeXml(responseBody);
              }
               // Remove multipart/form-data headers (e.g., Content-Disposition) if present
              if (requestBody !== "N/A") {
                // Remove lines like 'Content-Disposition: ...' and any boundary lines
                requestBody = requestBody.replace(/^Content-Disposition:.*(\r?\n)?/gm, '').trim();
                // Remove any boundary lines (e.g., ---------------------...)
                requestBody = requestBody.replace(/^[-]{2,}.*$/gm, '').trim();
              }
              // Extract fields from JSON, XML, or escaped XML
              function extractField(body, field) {
                var m = body.match(new RegExp('<' + field + '>([^<]+)</' + field + '>', 'i')) ||
                        body.match(new RegExp('&lt;' + field + '&gt;([\\s\\S]*?)&lt;/' + field + '&gt;', 'i')) ||
                        body.match(new RegExp('"' + field + '"\\s*:\\s*"([^"]+)"', 'i'));
                return (m && m[1]) ? m[1].trim() : "N/A";
              }

              // Extract MessageID (case-sensitive)
              function getMessageID(body) {
                var m = body.match(/<MessageID>([^<]+)<\/MessageID>/) ||
                        body.match(/&lt;MessageID&gt;([^&]+)&lt;\/MessageID&gt;/) ||
                        body.match(/"MessageID"\s*:\s*"([^"]+)"/);
                return (m && m[1]) ? m[1] : "N/A";
              }

              var messageId = getMessageID(requestBody);
              if (messageId === "N/A") messageId = getMessageID(responseBody);
              if (messageId === "N/A") messageId = extractField(requestBody, "auditMessageID");
              if (messageId === "N/A") messageId = extractField(responseBody, "auditMessageID");
             var portingArea = extractField(requestBody, "PortingArea");
              if (portingArea === "N/A" || !portingArea) {
                portingArea = extractField(responseBody, "PortingArea");
              }
             var portType = extractField(requestBody, "PortType");
              if (portType === "N/A" || !portType) {
                portType = extractField(responseBody, "PortType");
              }
               var msgCreateTimeStamp = extractField(requestBody, "MsgCreateTimeStamp");
              if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                msgCreateTimeStamp = extractField(responseBody, "MsgCreateTimeStamp");
              }
              if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                msgCreateTimeStamp = extractField(responseBody, "timeStamp");
              }
              if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
                msgCreateTimeStamp = extractField(requestBody, "timeStamp");
              }
             var sender = extractField(requestBody, "Sender");
              if (sender === "N/A" || !sender) {
                sender = extractField(responseBody, "Sender");
              }
              if (sender === "N/A" || !sender) {
                sender = extractField(requestBody, "userId");
              }
              // Extract Content-Type to determine transport type (if still needed in other ways)
              var contentTypeMatch = logMessage.match(/Content-Type:\s*([^\n\r]+)/);
              var contentType = contentTypeMatch ? contentTypeMatch[1].toLowerCase() : "N/A";

              // Extract portId or TransactionID
              var portId = extractField(requestBody, "portId");
              if (portId === "N/A") portId = extractField(responseBody, "portId");
              if (portId === "N/A") portId = extractField(requestBody, "TransactionID");
              if (portId === "N/A") {
                portId = extractField(responseBody, "TransactionID");
                // Debug: log if TransactionID extraction fails
                if (portId === "N/A" && responseBody.indexOf("TransactionID") !== -1) {
                  event.Put("debug_transactionid_search", "TransactionID found but not extracted from response");
                }
              }

              // Extract Number (Phone Number) from requestBody or responseBody
                            var number = extractField(requestBody, "Number");
                            if ((!number || number === "N/A") && responseBody && responseBody !== "N/A") {
                              number = extractField(responseBody, "Number");
                            }
                            if (!number || number === "N/A") {
                              number = extractField(requestBody, "NumberFrom");
                              if ((!number || number === "N/A") && responseBody && responseBody !== "N/A") {
                                number = extractField(responseBody, "NumberFrom");
                              }
                            }
              // Function to replace any content within <ws:attachedFiles> tags with "NA"
              function replaceAttachedFiles(xmlContent) {
                return xmlContent.replace(/<ws:attachedFiles>[^<]*<\/ws:attachedFiles>/g, "");
              }

              function maskJwtToken(resp) {
                try {
                  return resp.replace(/"jwtToken"\s*:\s*"([^"]+)"/, function(match, token) {
                    if (token.length > 4) {
                      var masked = "*******";
                      return '"jwtToken":"' + masked + '"';
                    } else {
                      return '"jwtToken":"' + "*".repeat(token.length) + '"';
                    }
                  });
                } catch (e) {
                  return resp; // fallback if regex fails
                }
              }

              function maskPassword(resp) {
                try {
                  return resp.replace(/"password"\s*:\s*"([^"]+)"/, function(match, password) {
                    return '"password":"' + "*******" + '"';
                  });
                } catch(e) {
                  return resp;
                }
              }

              // Extract the content sections from the log message and apply the replacement function
                   // Extract the content sections from the log message and apply the replacement function
             // var contentMatch = logMessage.match(/--[a-z0-9]+-C--\n([\s\S]*?)\n--[a-z0-9]+-F--/);
             // var requestBody = (contentMatch && contentMatch[1]) ? contentMatch[1] : "N/A";
              if (requestBody !== "N/A") {
                requestBody = replaceAttachedFiles(requestBody);
                requestBody = maskPassword(requestBody);
                var firstIdx = requestBody.indexOf('Content-Disposition:');
                if (firstIdx !== -1) {
                  var secondIdx = requestBody.indexOf('Content-Disposition:', firstIdx + 1);
                  if (secondIdx !== -1) {
                    requestBody = requestBody.substring(0, secondIdx).trim();
                  }
                }

                requestBody = requestBody.replace(/^Content-Disposition:.*(\r?\n)?/gm, '').trim();
                requestBody = requestBody.replace(/^[-]{2,}.*$/gm, '').trim();
                 // Extract only the JSON block for downstream use
                var jsonRequestBody = "N/A";
                var jsonMatch = requestBody.match(/\{[\s\S]*\}/);
                if (jsonMatch && jsonMatch[0]) {
                  requestBody = jsonMatch[0].trim();
                  // Remove everything from 'Content-Type:' onward
                  var ctIdx = requestBody.indexOf('Content-Type:');
                  if (ctIdx !== -1) {
                    requestBody = requestBody.substring(0, ctIdx).trim();
                  }
                }
              }
              var responseBodyMatch = logMessage.match(/--[a-z0-9]+-E--\n([\s\S]*?)\n--[a-z0-9]+-H--/);
              if (responseBodyMatch) {
                responseBodyMatch[1] = replaceAttachedFiles(responseBodyMatch[1]);
                responseBodyMatch[1] = maskJwtToken(responseBodyMatch[1]);
                responseBodyMatch[1] = maskPassword(responseBodyMatch[1]);
              }

              // Fallback to randWithTime if portId or number is N/A
              var uuid = (portId !== "N/A" || number !== "N/A") ? portId || number : randWithTime;
              //var request = (contentMatch && contentMatch[1] && contentMatch[1].trim()) ? contentMatch[1].trim() : "N/A";
               var request = (requestBody && requestBody.trim()) ? requestBody.trim() : "N/A";
              var response = (responseBodyMatch && responseBodyMatch[1] && responseBodyMatch[1].trim()) ? responseBodyMatch[1].trim() : "N/A";
              // Mask fileContent in JSON response
              if (response !== "N/A" && response.indexOf('"fileContent"') !== -1) {
                response = response.replace(/("fileContent"\s*:\s*")([\s\S]*?)(")/, '$1N/A$3');
              }
              var hostValue = hostMatch && hostMatch[1] ? hostMatch[1].split(',')[0].trim() : "N/A";
              var url = (hostValue !== "N/A" && urlMatch && urlMatch[1]) ? ("https://" + hostValue + urlMatch[1]) : "N/A";
              // Prepare the JSON object with extracted data
              var jsonObject = {
                "header": {
                  "messageId": randWithTime,
                  "timeStamp": currentTimestamp.toString(),
                  "source": "npc-apache-server",
                  "version": "1.0",
                  "type": status === "200 OK" ? "npc-apache-success" : "npc-apache-error" // Conditional type based on status
                },
                "payload": {
                  "portingArea": portingArea,
                  "portType": portType,
                  "messageID": messageId,
                  "msgCreateTimeStamp": msgCreateTimeStamp,
                  "senderOperator": sender,
                  "httpsStatusCode": status,
                  "portId": portId,
                  "firstPhoneNumber": number,
                  "sourceIp": sourceIp,
                  "url": url,
                  "request": request,
                  "response": response
                }
              };

              // Replace the original message with the processed JSON object
              event.Put("message", JSON.stringify(jsonObject));
              event.Put("UUID", uuid);
            }

  

# Kafka Output Configuration (unified for both sources)
output.kafka:
  hosts: ["${KAFKA_HOST}"]  # Updated to use the SSL-enabled broker
  topic: "${KAFKA_TOPIC}"
  key: '%{[UUID]}'
  codec.format:
    string: '%{[message]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  compression_level: 4
  max_message_bytes: 1000000
  version: "2.1.0"
  client_id: "filebeat-prod"
  bulk_max_size: 4096
  bulk_flush_frequency: 1s
  channel_buffer_size: 1024
  keep_alive: 30s
  max_retries: 5
  backoff.init: 2s
  backoff.max: 120s
  timeout: 60s
  broker_timeout: 20s

  # Enhanced Security Configuration
  sasl.mechanism: SCRAM-SHA-512
  username: "${KAFKA_USERNAME}"
  password: "${KAFKA_PASSWORD}"
  security.protocol: SASL_SSL
  ssl.enabled: true
  #ssl.verification_mode: full
  ssl.certificate_authorities: ["/etc/filebeat/certs/ca-cert.pem"]
  #ssl.verification_mode: none
 

# Enhanced Logging Configuration
logging.level: debug  # Reduced verbosity for cleaner logs
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644


# Disable console logging to keep it clean
logging.to_stderr: true

# Performance and Monitoring Configuration

# Queue Configuration for better performance
queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 1s

I assume this is just a continuation of this topic .. next time please keep them together, that would Help.

So you set all these to 5GB.. and now you are running out of memory...

Like @leandrojmp said what is the spec of the host and what else is running on that.

But I'll ask a higher level question. Why are you trying to read in 4gb logs or attachments in the first place? Perhaps you should provide a little context of what you're actually trying to accomplish. There may be a different way.

I am sending SOAP and REST requests to Apache. Apache mod_security logs print the full request and response. In my request, I am attaching files such as PDF and DOC. This attachment content is also printed in the mod_security logs. The problem is: if the file size is around 1 MB, Filebeat is able to read it, but if the file size is more, Filebeat is not able to read the entire event from the mod_security log. My maximum file size can be up to 5 GB.

We can identify the file content. We need not to read the file contents such as PDF, DOC, and image files

How do you run FB as onprem or an container?

systemctl cmd in rhel9

Apologies... I am a bit confused are you saying that if it is possible to detect the attached content

A) You want to DROP the content (PDF, DOC, etc) and NOT ingest it into elasticsearch
or
B) You DO want to ingest the content (PDF, DOC, etc) and DO ingest it into elasticsearch

We want to drop the attachment content (PDF, DOC, images). When reading from the mod_security logs