We are observing approximately a 20-second delay in message delivery from Filebeat to Kafka.
Filebeat is successfully sending messages to Kafka, but there is a noticeable delay before the messages are published to the Kafka topic. This delay is impacting near real-time log processing.
please check any improvment
filebeat.inputs:
# Input 1: Kafka Error Logs (from ELK configuration)
- type: log
enabled: true
paths:
- /home/npc/logs/kafkaError.log
- /home/npc/logs/kafkaError.log.*
scan_frequency: 5s # Check for new/rotated files every 5 seconds
close_inactive: 10m # Close files if no new log lines for 10 minutes
#ignore_older: 24h # Stop reading files older than 24 hours
fields:
log_source: "kafka_error"
fields_under_root: true
multiline.pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}'
multiline.negate: true
multiline.match: after
# Input 2: Apache ModSecurity Audit Logs (from Kafka configuration)
- type: log
enabled: true
paths:
- /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log
- /opt/jbcs-httpd24-2.4/httpd/logs/modsec_audit.log.*
harvester_buffer_size: 1073741824
message_max_bytes: 1073741824
multiline.max_lines: 1073741824
scan_frequency: 5s # Check for new/rotated files every 5 seconds
close_inactive: 10m # Close files if no new log lines for 10 minutes
#ignore_older: 24h # Stop reading files older than 24 hours
fields:
log_source: "apache_modsec"
fields_under_root: true
multiline.pattern: '^--[a-z0-9]+-A--$'
multiline.negate: true
multiline.match: after
# Exclude NPCWebGUI and NPCAdminGUI requests - don't send to Kafka
exclude_lines: ['/NPCWebGUI/', '/NPCAdminGUI/']
processors:
# Processor for Kafka Error Logs
- if:
equals:
log_source: "kafka_error"
then:
- script:
lang: javascript
source: >
function process(event) {
try {
var logMessage = event.Get("message");
if (!logMessage || typeof logMessage !== 'string') {
event.Put("processing_error", "No valid message field found");
return;
}
// Extract JSON substring from the log line
var start = logMessage.indexOf("{");
var end = logMessage.lastIndexOf("}");
if (start === -1 || end === -1 || end <= start) {
event.Put("processing_error", "No valid JSON structure found");
return;
}
var rawJson = logMessage.substring(start, end + 1);
try {
var parsed = JSON.parse(rawJson);
// Check if this is the nested structure with message.value and message.key
if (parsed.message && parsed.message.value && parsed.message.key) {
// Extract the key for UUID
var key = parsed.message.key;
// Parse the inner JSON from the value field
try {
var innerJson = JSON.parse(parsed.message.value);
// Set the inner JSON as the message content
event.Put("message", JSON.stringify(innerJson));
event.Put("UUID", key);
event.Put("processing_status", "success_nested");
} catch (innerE) {
// If inner JSON parsing fails, use the value as-is
event.Put("message", parsed.message.value);
event.Put("UUID", key);
event.Put("processing_status", "success_simple");
}
} else if (parsed.message) {
// Handle direct message structure
event.Put("message", JSON.stringify(parsed.message));
// Try to extract key for UUID
var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
var key = keyMatch ? keyMatch[1] : "N/A";
event.Put("UUID", key);
event.Put("processing_status", "success_direct");
} else {
// Handle any other JSON structure
event.Put("message", JSON.stringify(parsed));
// Try to extract key for UUID
var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
var key = keyMatch ? keyMatch[1] : "N/A";
event.Put("UUID", key);
event.Put("processing_status", "success_generic");
}
} catch (parseError) {
// Fallback: extract key and put full JSON string
var keyMatch = rawJson.match(/"key"\s*:\s*"([^"]*)"/);
var key = keyMatch ? keyMatch[1] : "N/A";
event.Put("message", rawJson);
event.Put("UUID", key);
event.Put("processing_status", "fallback");
event.Put("processing_error", "JSON parse error: " + parseError.message);
}
} catch (generalError) {
event.Put("processing_error", "General processing error: " + generalError.message);
event.Put("processing_status", "error");
}
}
# Processor for Apache ModSecurity Logs
- if:
equals:
log_source: "apache_modsec"
then:
- dissect:
tokenizer: "%{log}"
field: "message"
- script:
lang: javascript
source: >
function unescapeXml(str) {
return str.replace(/</g, "<").replace(/>/g, ">").replace(/&/g, "&").replace(/"/g, '"').replace(/'/g, "'");
}
function extractSection(log, section) {
var hashMatch = log.match(/--([a-z0-9]+)-A--/);
if (!hashMatch) return "N/A";
var hash = hashMatch[1];
var startMarker = "--" + hash + "-" + section + "--";
var startIdx = log.indexOf(startMarker);
if (startIdx === -1) return "N/A";
var afterStart = log.indexOf('\n', startIdx);
if (afterStart === -1) return "N/A";
afterStart++;
var nextSection = log.slice(afterStart).match(/--[a-z0-9]+-[A-Z]--/);
var endIdx = nextSection ? afterStart + nextSection.index : log.length;
return log.substring(afterStart, endIdx).trim();
}
function process(event) {
var logMessage = event.Get("message");
var now = new Date();
function pad(n, width) {
width = width || 2;
var s = n.toString();
return new Array(width - s.length + 1).join(0) + s;
}
function add(n) {
return (n < 10 ? '0' : '') + n;
}
var tmp_currentTimestamp = now.getFullYear().toString()
+ add(now.getMonth() + 1)
+ add(now.getDate())
+ add(now.getHours())
+ add(now.getMinutes())
+ add(now.getSeconds());
var randomNum = Math.floor(Math.random() * 10000000000);
var randWithTime = randomNum + "_" + tmp_currentTimestamp;
var httpsMatch = logMessage.match(/--[a-z0-9]+-F--\s*HTTP\/1\.1\s+([^\n\r]+)/i);
var status = (httpsMatch && httpsMatch[1]) ? httpsMatch[1].substring(0, 3) : "N/A";
if (status === "N/A") {
var fSection = extractSection(logMessage, "F");
if (fSection !== "N/A") {
var statusMatch = fSection.match(/HTTP\/[\d.]+\s+(\d{3})/i);
status = (statusMatch && statusMatch[1]) ? statusMatch[1] : "500";
}else {
status = "500";
}
}
var hostMatch = logMessage.match(/Host: ([^\n]+)/);
var urlMatch = logMessage.match(/--[a-z0-9]+-B--\nPOST ([^ ]+) /);
var aSectionMatch = logMessage.match(/--[a-z0-9]+-A--\n([\s\S]*?)--[a-z0-9]+-B--/i);
var sourceIp = "N/A";
if (aSectionMatch && aSectionMatch[1]) {
var ipMatch = aSectionMatch[1].match(/([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)/);
if (ipMatch) {
sourceIp = ipMatch[1];
}
}
// Extract request and response bodies using helper function
var requestBody = extractSection(logMessage, "C");
var responseBody = extractSection(logMessage, "E");
// Extract NPCData block from request
if (requestBody !== "N/A") {
var xmlMatch = requestBody.match(/<NPCData[\s\S]*?<\/NPCData>/i);
if (!xmlMatch) {
var unescaped = unescapeXml(requestBody);
xmlMatch = unescaped.match(/<NPCData[\s\S]*?<\/NPCData>/i);
}
if (xmlMatch) {
requestBody = xmlMatch[0];
}
}
event.Put("debug_NPCData", requestBody);
// Unescape XML in response
if (responseBody !== "N/A") {
responseBody = unescapeXml(responseBody);
}
// ENHANCED: Clean multipart form data headers from request body
if (requestBody !== "N/A") {
// Remove Content-Disposition headers
requestBody = requestBody.replace(/^Content-Disposition:.*(\r?\n)?/gm, '').trim();
// Remove boundary lines
requestBody = requestBody.replace(/^[-]{2,}.*$/gm, '').trim();
// ENHANCED: Handle multiple Content-Disposition blocks (keep only first part)
var firstIdx = requestBody.indexOf('Content-Disposition:');
if (firstIdx !== -1) {
var secondIdx = requestBody.indexOf('Content-Disposition:', firstIdx + 1);
if (secondIdx !== -1) {
requestBody = requestBody.substring(0, secondIdx).trim();
}
}
// ENHANCED: Extract only JSON block if present (critical for clean data)
var jsonMatch = requestBody.match(/\{[\s\S]*\}/);
if (jsonMatch && jsonMatch[0]) {
requestBody = jsonMatch[0].trim();
// Remove everything from 'Content-Type:' onward if it somehow remains
var ctIdx = requestBody.indexOf('Content-Type:');
if (ctIdx !== -1) {
requestBody = requestBody.substring(0, ctIdx).trim();
}
}
}
function extractField(body, field) {
var m = body.match(new RegExp('<' + field + '>([^<]+)</' + field + '>', 'i')) ||
body.match(new RegExp('<' + field + '>([\\s\\S]*?)</' + field + '>', 'i')) ||
body.match(new RegExp('"' + field + '"\\s*:\\s*"([^"]+)', 'i')) ||
body.match(new RegExp('"' + field + '"\\s*:\\s*\\[\\s*"([^"]+)"', 'i'));
return (m && m[1]) ? m[1].trim() : "N/A";
}
function getMessageID(body) {
var m = body.match(/<MessageID>([^<]+)<\/MessageID>/) ||
body.match(/<MessageID>([^&]+)<\/MessageID>/) ||
body.match(/"MessageID"\s*:\s*"([^"]+)"/);
return (m && m[1]) ? m[1] : "N/A";
}
var messageId = getMessageID(requestBody);
if (messageId === "N/A") messageId = getMessageID(responseBody);
if (messageId === "N/A") messageId = extractField(requestBody, "auditMessageID");
if (messageId === "N/A") messageId = extractField(responseBody, "auditMessageID");
var portingArea = extractField(requestBody, "PortingArea");
if (portingArea === "N/A" || !portingArea) {
portingArea = extractField(responseBody, "PortingArea");
}
var portType = extractField(requestBody, "PortType");
if (portType === "N/A" || !portType) {
portType = extractField(responseBody, "PortType");
}
var msgCreateTimeStamp = extractField(requestBody, "MsgCreateTimeStamp");
if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
msgCreateTimeStamp = extractField(responseBody, "MsgCreateTimeStamp");
}
if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
msgCreateTimeStamp = extractField(responseBody, "timeStamp");
}
if (msgCreateTimeStamp === "N/A" || !msgCreateTimeStamp) {
msgCreateTimeStamp = extractField(requestBody, "timeStamp");
}
var sender = extractField(requestBody, "Sender");
if (sender === "N/A" || !sender) {
sender = extractField(responseBody, "Sender");
}
if (sender === "N/A" || !sender) {
sender = extractField(requestBody, "userId");
}
var contentTypeMatch = logMessage.match(/Content-Type:\s*([^\n\r]+)/);
var contentType = contentTypeMatch ? contentTypeMatch[1].toLowerCase() : "N/A";
var portId = extractField(requestBody, "portId");
if (portId === "N/A") portId = extractField(responseBody, "portId");
if (portId === "N/A") portId = extractField(requestBody, "TransactionID");
if (portId === "N/A") {
portId = extractField(responseBody, "TransactionID");
if (portId === "N/A" && responseBody.indexOf("TransactionID") !== -1) {
event.Put("debug_transactionid_search", "TransactionID found but not extracted from response");
}
}
var number = extractField(requestBody, "Number");
if ((!number || number === "N/A") && responseBody && responseBody !== "N/A") {
number = extractField(responseBody, "Number");
}
if (!number || number === "N/A") {
number = extractField(requestBody, "NumberFrom");
if ((!number || number === "N/A") && responseBody && responseBody !== "N/A") {
number = extractField(responseBody, "NumberFrom");
}
}
function replaceAttachedFiles(xmlContent) {
return xmlContent.replace(/<ws:attachedFiles>[^<]*<\/ws:attachedFiles>/g, "");
}
function maskJwtToken(resp) {
try {
return resp.replace(/"jwtToken"\s*:\s*"([^"]+)"/, function(match, token) {
if (token.length > 4) {
var masked = "*******";
return '"jwtToken":"' + masked + '"';
} else {
return '"jwtToken":"' + "*".repeat(token.length) + '"';
}
});
} catch (e) {
return resp;
}
}
function maskPassword(resp) {
try {
return resp.replace(/"password"\s*:\s*"([^"]+)"/, function(match, password) {
return '"password":"' + "*******" + '"';
});
} catch(e) {
return resp;
}
}
// Apply masking and cleanup to request body
if (requestBody !== "N/A") {
requestBody = replaceAttachedFiles(requestBody);
requestBody = maskPassword(requestBody);
}
// Apply masking and cleanup to response body
if (responseBody !== "N/A") {
responseBody = maskJwtToken(responseBody);
responseBody = maskPassword(responseBody);
responseBody = replaceAttachedFiles(responseBody);
}
var uuid = (portId !== "N/A" || number !== "N/A") ? portId || number : randWithTime;
var request = (requestBody && requestBody.trim()) ? requestBody.trim() : "N/A";
var response = (responseBody && responseBody.trim()) ? responseBody.trim() : "N/A";
// Mask fileContent in JSON response
if (response !== "N/A" && response.indexOf('"fileContent"') !== -1) {
response = response.replace(/("fileContent"\s*:\s*")([\s\S]*?)(")/, '$1N/A$3');
}
var hostValue = hostMatch && hostMatch[1] ? hostMatch[1].split(',')[0].trim() : "N/A";
var url = (hostValue !== "N/A" && urlMatch && urlMatch[1]) ? ("https://" + hostValue + urlMatch[1]) : "N/A";
var jsonObject = {
"header": {
"messageId": randWithTime,
"timeStamp": tmp_currentTimestamp.toString(),
"source": "npc-apache-server",
"version": "1.0",
"type": status === "200" ? "npc-apache-success" : "npc-apache-error"
},
"payload": {
"portingArea": portingArea,
"portType": portType,
"messageId": messageId,
"msgCreateTimeStamp": msgCreateTimeStamp,
"senderOperator": sender,
"httpsStatusCode": status,
"portId": portId,
"firstPhoneNumber": number,
"sourceIp": sourceIp,
"url": url,
"request": request,
"response": response
}
};
event.Put("message", JSON.stringify(jsonObject));
event.Put("UUID", uuid);
}
# Kafka Output Configuration (unified for both sources)
output.kafka:
hosts: ["${KAFKA_HOST_1}", "${KAFKA_HOST_2}", "${KAFKA_HOST_3}"] # Updated to use the SSL-enabled broker
topic: "${KAFKA_TOPIC}"
key: '%{[UUID]}'
codec.format:
string: '%{[message]}'
partition.hash:
reachable_only: true
required_acks: 1
compression: gzip
compression_level: 4
max_message_bytes: 10485760
version: "2.1.0"
client_id: "filebeat-prod"
bulk_max_size: 4096
bulk_flush_frequency: 1s
channel_buffer_size: 1024
keep_alive: 30s
max_retries: 5
backoff.init: 2s
backoff.max: 120s
timeout: 60s
broker_timeout: 20s
# Enhanced Security Configuration
#sasl.mechanism: SCRAM-SHA-512
#username: "${KAFKA_USERNAME}"
#password: "${KAFKA_PASSWORD}"
security.protocol: PLAINTEXT
#ssl.enabled: true
#ssl.verification_mode: full
#ssl.certificate_authorities: ["/etc/filebeat/certs/ca-cert.pem"]
#ssl.verification_mode: none
# Enhanced Logging Configuration
logging.level: debug # Reduced verbosity for cleaner logs
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
# Disable console logging to keep it clean
logging.to_stderr: true
# Performance and Monitoring Configuration
# Memory queue - will retry sending when Kafka comes back up
queue.mem:
events: 32768 # Increased capacity to hold more events when Kafka is down
flush.min_events: 2048
flush.timeout: 5s