We are facing an issue where Filebeat is not able to process or hold a single large event of approximately 5 GB .
Due to this limitation, the event is not being shipped successfully to Elasticsearch. This appears to be a Filebeat-side constraint on handling very large individual events rather than multiple smaller events.
We request you to please review this issue and advise on:
- Supported maximum size for a single event in Filebeat
- Possible configuration changes or alternatives to handle large events
- Recommended approach to split or process such large events
Please let us know if any additional information or logs are required from our side.
Please let me know if any further information is required.
filebeat.inputs:
# Input 1: Kafka Error Logs (from ELK configuration)
- type: log
enabled: true
paths:
- /home/npc/logs/kafkaError.log
- /home/npc/logs/kafkaError.log.*
scan_frequency: 5s # Check for new/rotated files every 5 seconds
close_inactive: 10m # Close files if no new log lines for 10 minutes
#ignore_older: 24h # Stop reading files older than 24 hours
fields:
log_source: "kafka_error"
fields_under_root: true
multiline.pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}'
multiline.negate: true
multiline.match: after
# Input 2: Apache ModSecurity Audit Logs (from Kafka configuration)
- type: log
enabled: true
paths:
- /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log
- /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log.*
harvester_buffer_size: 1073741824
message_max_bytes: 1073741824
multiline.max_lines: 1073741824
scan_frequency: 5s # Check for new/rotated files every 5 seconds
close_inactive: 10m # Close files if no new log lines for 10 minutes
#ignore_older: 24h # Stop reading files older than 24 hours
fields:
log_source: "apache_modsec"
fields_under_root: true
multiline.pattern: '^--[a-z0-9]+-A--$'
multiline.negate: true
multiline.match: after
# Exclude NPCWebGUI and NPCAdminGUI requests - don't send to Kafka
exclude_lines: ['/NPCWebGUI/', '/NPCAdminGUI/']
# Processor for Apache ModSecurity Logs
- if:
equals:
log_source: "apache_modsec"
then:
- dissect:
tokenizer: "%{log}"
field: "message"
- script:
lang: javascript
source: >
// Extract status code like "200 OK" or others
var httpsMatch = logMessage.match(/--[a-z0-9]+-F--\s*HTTP\/1\.1\s+([^\n\r]+)/i);
var status = httpsMatch ? httpsMatch[1] : "";
var hostMatch = logMessage.match(/Host: ([^\n]+)/);
var urlMatch = logMessage.match(/--[a-z0-9]+-B--\nPOST ([^ ]+) /);
var aSectionMatch = logMessage.match(/--[a-z0-9]+-A--\n([\s\S]*?)--[a-z0-9]+-B--/i);
var sourceIp = "N/A";
if (aSectionMatch && aSectionMatch[1]) {
// Find the first IPv4 address in the --A-- section
var ipMatch = aSectionMatch[1].match(/([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)/);
if (ipMatch) {
sourceIp = ipMatch[1];
}
}
// Extract requestBody between --C-- and --F-- (Request body - XML/JSON)
var requestMatch = logMessage.match(/--[a-z0-9]+-C--\n([\s\S]*?)\n--[a-z0-9]+-F--/);
var requestBody = requestMatch ? requestMatch[1].trim() : "N/A";
// Robust extraction of <NPCData> block
if (requestBody !== "N/A") {
var xmlMatch = requestBody.match(/<NPCData[\s\S]*?<\/NPCData>/i);
if (!xmlMatch) {
var unescaped = unescapeXml(requestBody);
xmlMatch = unescaped.match(/<NPCData[\s\S]*?<\/NPCData>/i);
}
if (xmlMatch) {
requestBody = xmlMatch[0];
}
}
event.Put("debug_NPCData", requestBody);
// Extract responseBody between --E-- and --H-- (Response body - XML/JSON)
var responseMatch = logMessage.match(/--[a-z0-9]+-E--\n([\s\S]*?)\n--[a-z0-9]+-H--/);
var responseBody = responseMatch ? responseMatch[1].trim() : "N/A";
if (responseBody !== "N/A") {
responseBody = unescapeXml(responseBody);
}
// Remove multipart/form-data headers (e.g., Content-Disposition) if present
if (requestBody !== "N/A") {
// Remove lines like 'Content-Disposition: ...' and any boundary lines
requestBody = requestBody.replace(/^Content-Disposition:.*(\r?\n)?/gm, '').trim();
// Remove any boundary lines (e.g., ---------------------...)
requestBody = requestBody.replace(/^[-]{2,}.*$/gm, '').trim();
}
// Extract fields from JSON, XML, or escaped XML
function extractField(body, field) {
var m = body.match(new RegExp('<' + field + '>([^<]+)</' + field + '>', 'i')) ||
body.match(new RegExp('<' + field + '>([\\s\\S]*?)</' + field + '>', 'i')) ||
body.match(new RegExp('"' + field + '"\\s*:\\s*"([^"]+)"', 'i'));
return (m && m[1]) ? m[1].trim() : "N/A";
}
// Extract MessageID (case-sensitive)
function getMessageID(body) {
var m = body.match(/<MessageID>([^<]+)<\/MessageID>/) ||
body.match(/<MessageID>([^&]+)<\/MessageID>/) ||
body.match(/"MessageID"\s*:\s*"([^"]+)"/);
return (m && m[1]) ? m[1] : "N/A";
}
var messageId = getMessageID(requestBody);
if (messageId === "N/A") messageId = getMessageID(responseBody);
if (messageId === "N/A") messageId = extractField(requestBody, "auditMessageID");
if (messageId === "N/A") messageId = extractField(responseBody, "auditMessageID");
var portingArea = extractField(requestBody, "PortingArea");
if (portingArea === "N/A" || !portingArea) {
portingArea = extractField(responseBody, "PortingArea");
}
var portType = extractField(requestBody, "PortType");
if (portType === "N/A" || !portType) {
portType = extractField(responseBody, "PortType");
}
var msgCreateTimeStamp = extractField(requestBody, "MsgCreateTimeStamp");
if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
msgCreateTimeStamp = extractField(responseBody, "MsgCreateTimeStamp");
}
if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
msgCreateTimeStamp = extractField(responseBody, "timeStamp");
}
if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
msgCreateTimeStamp = extractField(requestBody, "timeStamp");
}
var sender = extractField(requestBody, "Sender");
if (sender === "N/A" || !sender) {
sender = extractField(responseBody, "Sender");
}
if (sender === "N/A" || !sender) {
sender = extractField(requestBody, "userId");
}
// Extract Content-Type to determine transport type (if still needed in other ways)
var contentTypeMatch = logMessage.match(/Content-Type:\s*([^\n\r]+)/);
var contentType = contentTypeMatch ? contentTypeMatch[1].toLowerCase() : "N/A";
// Extract portId or TransactionID
var portId = extractField(requestBody, "portId");
if (portId === "N/A") portId = extractField(responseBody, "portId");
if (portId === "N/A") portId = extractField(requestBody, "TransactionID");
if (portId === "N/A") {
portId = extractField(responseBody, "TransactionID");
// Debug: log if TransactionID extraction fails
if (portId === "N/A" && responseBody.indexOf("TransactionID") !== -1) {
event.Put("debug_transactionid_search", "TransactionID found but not extracted from response");
}
}
function maskPassword(resp) {
try {
return resp.replace(/"password"\s*:\s*"([^"]+)"/, function(match, password) {
return '"password":"' + "*******" + '"';
});
} catch(e) {
return resp;
}
}
// Replace the original message with the processed JSON object
event.Put("message", JSON.stringify(jsonObject));
event.Put("UUID", uuid);
}
# Kafka Output Configuration (unified for both sources)
output.kafka:
hosts: ["${KAFKA_HOST_1}", "${KAFKA_HOST_2}", "${KAFKA_HOST_3}"] # Updated to use the SSL-enabled broker
topic: "${KAFKA_TOPIC}"
key: '%{[UUID]}'
codec.format:
string: '%{[message]}'
partition.hash:
reachable_only: true
required_acks: 1
compression: gzip
compression_level: 4
max_message_bytes: 1000000
version: "2.1.0"
client_id: "filebeat-prod"
bulk_max_size: 4096
bulk_flush_frequency: 1s
channel_buffer_size: 1024
keep_alive: 30s
max_retries: 5
backoff.init: 2s
backoff.max: 120s
timeout: 60s
broker_timeout: 20s
# Enhanced Security Configuration
sasl.mechanism: SCRAM-SHA-512
username: "${KAFKA_USERNAME}"
password: "${KAFKA_PASSWORD}"
security.protocol: SASL_SSL
ssl.enabled: true
#ssl.verification_mode: full
ssl.certificate_authorities: ["/etc/filebeat/certs/ca-cert.pem"]
#ssl.verification_mode: none
# Enhanced Logging Configuration
logging.level: debug # Reduced verbosity for cleaner logs
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
# Disable console logging to keep it clean
logging.to_stderr: true
# Performance and Monitoring Configuration
# Memory queue - will retry sending when Kafka comes back up
queue.mem:
events: 32768 # Increased capacity to hold more events when Kafka is down
flush.min_events: 2048
flush.timeout: 5s
Thanks,
Thirupathi