As it is getting event lines from ISE, the first part aggregates the various lines into a single event. The aggregate filter - and the ruby code in the filter - only works when lines are delivered in the right order (that part is, btw, essentially copied/pasted from https://stackoverflow.com/questions/34259414/syslog-message-in-multi-parts-can-logstash-reassemble-it).
We had some problems to guarantee this, and as a workaround we now re-process in batch: we drop the index of the day, sort the ISE event lines of that day, and re-pipe the output to Filebeat.
The pipeline is quite lengthy. I annotated with ### the area where code was removed for the sake of clarity.
filter {
if [fields][type] =~ "ISELOG" {
# Parse an event line
grok {
patterns_dir => ["/opt/elk/config/logstash/patterns/grok-patterns"]
match => [
"message", "(?:\<%{POSINT:syslog_pri}\>)?%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:hostname} %{WORD:kind} %{NOTSPACE:taskid} %{INT:duration:int} %{INT:order:int}( %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} %{ISO8601_TIMEZONE})? %{GREEDYDATA:rest}"
]
}
# If this is an event with a single line -> no aggregate filter
# Simply keep copy of messge in map variable
if [order] == 0 {
if [duration] == 1 {
ruby {
code => "
map = {}
map['the_message'] = {}
map['the_message']['0'] = event.get('message')
event.set('message_parts', map['the_message'])
event.set('tags', 'finish')
"
}
}
# If this is the first event line of an event split over multiple lines, start an aggregate filter
# with the creation of a map variable and save copy of message to it
#
else {
aggregate {
task_id => "%{taskid}"
code => "map['the_message'] ||= {} ; map['the_message']['0'] = event.get('message')"
map_action => "create"
}
}
}
else {
# Keep adding event lines to the array until the last part is received
# This is to ensure messages are in the correct order
# We're not interested in PRI, timestamp ... on all but the first message so keep only 'rest' matched field here
aggregate {
task_id => "%{taskid}"
code => "
map['the_message'][event.get('order').to_s] = event.get('rest')
event.set('message_parts', map['the_message'])
if map['the_message'].length == event.get('duration') then
event.set('tags', 'finish')
end
"
map_action => "update"
}
}
# If we got the last part (the tags field contains 'finish'), concatenate event lines
# Else drop the event line (there is a copy in map anyway)
if "finish" in [tags] {
# Ruby code to convert the array to a well formatted single string
ruby {
code => "
msg = ''
a = event.get('[message_parts]')
num = a.length
for i in 0..num-1
msg += a[i.to_s]
end
# set the message to the new concatenated one
event.set('concat_message', msg)
"
}
# Remove those fields we don't need in elasticsearch
mutate {
remove_field => [ "syslog_pri", "timestamp", "hostname", "kind", "taskid", "order", "duration", "rest" , "timestamp", "syslog_pri", "message", "message_parts"]
}
# Now finally match the full event (i.e. the concatenation of all event lines)
grok {
match => {
"concat_message" => ["(?:\<%{POSINT:syslog_pri}\>)?%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:hostname} %{WORD:kind} %{NOTSPACE:taskid} %{INT:duration:int} %{INT:order:int} %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} %{ISO8601_TIMEZONE} %{GREEDYDATA:attributes}"]
}
}
date {
match => [ "timestamp", "MMM dd HH:mm:ss", "MMM d HH:mm:ss" ]
}
# Parsing / matching depends on the type of event we are processing (type of event is recorded in 'kind' field)
# Authentication events - to be processed except when they don't have UserName
# ISE regularly notifies a 'spurious' authentication event that does not correspond to actual users connections
if ("CISE_Passed_Authentications" in [kind]) {
if ([concat_message] =~ /Passed-Authentication: Authentication succeeded/) {
# Retrieve easily identifiable fields
grok {
break_on_match => false
match => {
"attributes" => [
"UserName=%{WORD:user_id}",
"(?<country_cd>(?<=countryCode=)\d+)",
"(?<manager_id>(?<=manager=CN=)\w{2}\d{3})",
"(?<cost_center>(?<=costCenter=)[^,]+)",
"(?<session_id>(?<=Class=)[^;]+)"
]
}
}
# Split the message (attributes) in comma-separated (when not escaped) elements
# Next split each element by equal sign (when not escaped) into key/value
# Keep the field names we're interested in and remove escape and superfluous spaces
ruby {
code => "
# ### Removed code snippet - basically formatting some fields and adding to the event
"
}
}
# Ignore event
else {
mutate {
add_tag => ["ignore_event"]
}
}
}
# ### Processing of other types of messages removed
# The aggregate filter for reconstituting events from N event lines
aggregate {
task_id => "%{taskid}"
code => ""
map_action => "update"
end_of_task => true
}
# Drop fields we do not need
mutate {
remove_field => ["order", "duration", "attributes"]
rename => ["concat_message", "message" ]
}
}
else {
# Ignore individual parts
drop{}
}
}
}
output {
if [fields][type] =~ "ISELOG" {
elasticsearch {
hosts => ["https://@@ELK_HOST_LN_01@@:@@ELASTICSEARCH_PORT@@", "https://@@ELK_HOST_LN_02@@:@@ELASTICSEARCH_PORT@@", "https://@@ELK_HOST_LN_03@@:@@ELASTICSEARCH_PORT@@"]
index => "logstash-iselog-%{+YYYY.MM.dd}"
# ### Further param removed
}
}
}
I also put the script that we run to reload events. I'm not too sure about the behaviour of Filebeat when we delete a file and then re-create it. Does it position itself correctly?
cd /opt/elk/data/iselog
\rm -f iselog.log.reprocess
# Reprocess the last 3 ISE event files
for file in `ls -lrt iselog.log.20* | tail -3 |awk '{print $9}' `
do
# Sort the records we want to reload and pipe to Filebeat
cat ${file} | sort -u >>iselog.log.reprocess
done