I am getting syslogs from Clearpass servers and using logstash to ingest them into elasticsearch.
Some of these messages are received on multiple syslog packets, but they are really the same (big) message.
I am trying to combine them (without much success) into one message before sending them to elastic.
My grok entry is as follows:
When a message has multiple entries "num_of_msgs" reflects the number of them (1,2,3,4 etc) and "msg_num" is the sequence number for each message (0,1,2, etc).
The common element between all entries is "syslog_id".
The idea is to concatenate all "message" entries with the same "syslog_id" in the proper order, create an entry with that information and submit it to elasticsearch. Also do not want the partial messages in elasticsearch, only the combined one.
I tried this, but it is not doing what I want at all.
if [num_of_msgs] > "1" {
aggregate {
task_id => "%{syslog_pid}"
code => "map['message'] ||=' '; map['message'] +=%{\n}+ event.get('message')"
map_action => "create_or_update"
push_map_as_event_on_timeout => true
timeout => 10
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop{}
}
}
I did a lot of google search but I do not really found anything I can follow.
Can you please help me with pointers or suggestions.
Do these messages have a unique ID? So, for example, if an event that comes in over 4 packets, does each contain a field with the same value that is globally unique? You could potentially use that as the index ID and the event will be updated in Elasticsearch.
Since you don't list it, I would assume you are using the TCP input to receive the events. As long as the separate packets are contiguous, you might be able to use the multiline codec to stitch the events together.
The common element between all entries is "syslog_pid".
I am using UDP
So if I receive these two syslog entries like this:
.... CPPM_blah_blah 1656456 2 0 "message 123 123 123"
.... CPPM_blah_blah 1656456 2 1 "message 456 456 456"
I want to submit to elasticsearch the following:
.... CPPM_blah_blah 1656456 1 0 "message 123 123 123 message 456 456 456"
So...and my interpretation may be wrong, but going off your existing 'code' configuration, map['message'] ||=' '; map['message'] +=%{\n}+ event.get('message'), I would read that to say "(1)Map previous event field message as blank if it doesn't exist. (2)Map previous event field message and append with a new line and the value of the current event's message field."
What happens if you simplify it to "event.set('message', map['message'])"? I would expect it to concatenate the values into a single string.
If that syslog_pid really is a process id and not a message id then if you get two messages within 10 seconds this will combine them. To avoid that you would have to stop using push_map_as_event_on_timeout and instead use a second aggregate filter with map_action => "update" and end_of_task => true when [msg_num] is one less than [num_of_msgs] (which would require a ruby filter to do the maths). More like example 2 than example 3.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.