We are currently using version 7.17.6 across all ELK components.
Yes, we define the document_id
in the output
section of logstash.conf
Actually, both - 6 nodes and 6 replicas, with one replica on each node.
logstash resources:
logstashJavaOpts: "-Xmx3g -Xms3g"
resources:
requests:
cpu: "1000m"
memory: "1536Mi"
limits:
cpu: "1900m"
memory: "5Gi"
filebeat resources:
resources:
requests:
cpu: "100m"
memory: "100Mi"
limits:
cpu: "1900m"
memory: "5Gi"
Here are the configurations for Filebeat:
filebeat.inputs:
- type: filestream
id: project-logs-id
paths:
- /data/project/logs/**/*.log*
close.reader.on_eof: true
queue.disk:
path: "/usr/share/filebeat/diskqueue"
max_size: 1GB
output.logstash:
hosts: '${LOGSTASH_HOSTS}'
loadbalance: "${LOAD_BALANCING:false}"
Here are the configurations for Logstash:
input {
beats {
port => 5044
}
}
filter {
grok {
match => {"[log][file][path]" => "/data/project/logs/(?<logFolder>[^/]+)/.*\.log"}
}
grok {
match => {"message" => "^(?:\x1b\[\d+m)*%{TIMESTAMP_ISO8601:entryTimestamp} %{LOGLEVEL:logLevel}\s+\[(?<hostname>[^]]+)\] \[(?<logger>[^]]+)\] \[(?<thread>[^]]+)\] \((?<transactionId>[^)]+)\) \((?<businessFlowId>[^)]*)\) operation=%{NOTSPACE:operation}, auditEventMessage=%{GREEDYDATA:auditEventMessage}"}
}
date {
match => ["entryTimestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
target => "@timestamp"
timezone => "UTC"
}
json {
source => auditEventMessage
target => "auditEvent"
}
mutate {
remove_field => [ "host", "[agent][hostname]", "[agent][ephemeral_id]", "[agent][id]", "[agent][name]", "[agent][type]", "[agent][version]", "[ecs][version]" ]
}
ruby {
code => 'fields = ["startedAt", "finishedAt", "transactionId", "elapsedTime", "databaseElapsedTime", "componentElapsedTime", "eventType", "componentType", "host",
"transactionStatus", "forcedRelease", "errorCode", "errorMessage", "isSecurityEvent", "outputRequest",
"outputResponse", "inputRequest", "inputResponse", "thread", "logger", "businessFlowId"]
c = event.get("auditEvent")
if c
c.each do |index, value|
if not fields.include? index
event.set(index, value)
event.remove("[auditEvent][#{index}]")
end
end
end'
}
mutate {
rename => { "logLevel" => "[auditEvent][logLevel]" }
rename => { "hostname" => "[auditEvent][hostname]" }
rename => { "logger" => "[auditEvent][logger]" }
rename => { "thread" => "[auditEvent][thread]" }
rename => { "operation" => "[auditEvent][operation]" }
rename => { "entryTimestamp" => "[auditEvent][entryTimestamp]" }
rename => { "type" => "[auditEvent][type]" }
remove_field => [ "auditEventMessage", "message", "path", "[auditEvent][businessFlowId]", "transactionId" ]
}
}
output {
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] and "_jsonparsefailure" not in [tags] and "_rubyexception" not in [tags] {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST:elasticsearch:9200}"]
cacert => "${ELASTICSEARCH_CERT:/usr/share/logstash/config/certs/elastic-certificate.crt}"
user => "${ELASTICSEARCH_USERNAME:default}"
password => "${ELASTICSEARCH_PASSWORD:default}"
ssl_certificate_verification =>"${CERTIFICATE_VERIFICATION:false}"
index => "raw-%{logFolder}"
document_id => "%{[auditEvent][transactionId]}"
action => "update"
doc_as_upsert => true
}
}
}