Logstash aggregate syslog entries


I am getting syslogs from Clearpass servers and using logstash to ingest them into elasticsearch.
Some of these messages are received on multiple syslog packets, but they are really the same (big) message.
I am trying to combine them (without much success) into one message before sending them to elastic.
My grok entry is as follows:

grok {
        match => [
                 "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program} %{POSINT:syslog_pid} %{NONNEGINT:num_of_msgs} %{NONNEGINT:msg_num} %{GREEDYDATA:message}",
                 "message", "<%{POSINT:syslog_id}>\.\.\.%{GREEDYDATA:message}"
        overwrite => [ "message" ]

When a message has multiple entries "num_of_msgs" reflects the number of them (1,2,3,4 etc) and "msg_num" is the sequence number for each message (0,1,2, etc).
The common element between all entries is "syslog_id".

The idea is to concatenate all "message" entries with the same "syslog_id" in the proper order, create an entry with that information and submit it to elasticsearch. Also do not want the partial messages in elasticsearch, only the combined one.

I tried this, but it is not doing what I want at all.

if [num_of_msgs] > "1" {
           aggregate {
              task_id => "%{syslog_pid}"
              code => "map['message'] ||=' '; map['message'] +=%{\n}+ event.get('message')"
              map_action => "create_or_update"
              push_map_as_event_on_timeout => true
              timeout => 10
              timeout_tags => ['aggregated']
           if "aggregated" not in [tags] {

I did a lot of google search but I do not really found anything I can follow.
Can you please help me with pointers or suggestions.

Thank you

Do these messages have a unique ID? So, for example, if an event that comes in over 4 packets, does each contain a field with the same value that is globally unique? You could potentially use that as the index ID and the event will be updated in Elasticsearch.

Since you don't list it, I would assume you are using the TCP input to receive the events. As long as the separate packets are contiguous, you might be able to use the multiline codec to stitch the events together.

Thank you for your answer

The common element between all entries is "syslog_pid".
I am using UDP

So if I receive these two syslog entries like this:
.... CPPM_blah_blah 1656456 2 0 "message 123 123 123"
.... CPPM_blah_blah 1656456 2 1 "message 456 456 456"

I want to submit to elasticsearch the following:
.... CPPM_blah_blah 1656456 1 0 "message 123 123 123 message 456 456 456"

Ahh....that makes it more complex. I would guess something like

  1. Setup elasticsearch output to use the syslog_pid as the document id
  2. Perform an elasticsearch lookup in your pipeline filter to pull the previously ingested event data.
  3. Use another filter, maybe mutate, to create the array of values.

There is a way using the logstash aggregate filter plugin, but I cannot figure out the right syntax for my case.


So...and my interpretation may be wrong, but going off your existing 'code' configuration, map['message'] ||=' '; map['message'] +=%{\n}+ event.get('message'), I would read that to say "(1)Map previous event field message as blank if it doesn't exist. (2)Map previous event field message and append with a new line and the value of the current event's message field."

What happens if you simplify it to "event.set('message', map['message'])"? I would expect it to concatenate the values into a single string.

When using push_map_as_event_on_timeout the only fields that will be present on the event are whatever you added to the map.

You could try something like this:

     aggregate {
         task_id => "%{syslog_pid}"
         code => '
             map["syslog_pri"] ||= event.get("syslog_pri")
             map["syslog_timestamp"] ||= event.get("syslog_timestamp")
             map["syslog_hostname"] ||= event.get("syslog_hostname")
             map["syslog_program"] ||= event.get("syslog_program")
             map["syslog_pid"] ||= event.get("syslog_pid")
             map["message"] ||= []
             map["message"] << event.get("message")
         push_map_as_event_on_timeout => true
         timeout => 10

If that syslog_pid really is a process id and not a message id then if you get two messages within 10 seconds this will combine them. To avoid that you would have to stop using push_map_as_event_on_timeout and instead use a second aggregate filter with map_action => "update" and end_of_task => true when [msg_num] is one less than [num_of_msgs] (which would require a ruby filter to do the maths). More like example 2 than example 3.

I believe


is a message id

I should rename it to: syslog_id or message_id

aggregate is now working as expected.

But instead of concatenate the messages, it adds values to an array:

map["message"] << get.event("message")

message {
[0] = " this is line 1"
[1] = "this is line 2"

Is there a way to concatenate to the string instead of adding a new array entry?

Sure, you could

         map["message"] ||= ""
         map["message"] += event.get("message")

possibly adding a delimiter as well.

I get this error when I use +=
occurred {:error=>#<TypeError: no implicit conversion of String into Array>,

Did you change map["message"] ||= [] to map["message"] ||= ""?

I just did
Seems to be working now.
so by putting the square brackets

the variable is defined as an array!!

Thank you