Push_previous_map_as_event if fields exist

Hi, I need help figuring out a way for Logstash to check if a specific filed exists for a specific task_id and then aggregate those fields using push_previous_map_as_event.

For example, my current aggregate filter looks like this:

if [a_node] and [z_node] and [site] {
        aggregate {
            task_id => "%{a_node}-%{z_node}-%{site}"
            timeout_task_id_field => "task_id"
            timeout_timestamp_field => "@timestamp"
            code => "
              event.get()
                map['out-mbps'] ||= event.get('out-mbps')
                map['in-mbps'] ||= event.get('in-mbps')
            "
            push_previous_map_as_event => true
        }
    }

These are the two documents that were aggregated together:

{
                "@version" => "1",
              "@timestamp" => 2020-09-30T19:20:44.000Z,
                  "a_node" => "routerA",
                    "site" => "site1",
                    "path" => "/usr/share/logstash/logs/bb_snmp_clean.out",
                    "host" => "fd8de0398490",
                  "z_node" => "routerB",
                 "in-mbps" => 0.14,
                  "market" => "blue"
}
{
                 "@version" => "1",
                     "path" => "/usr/share/logstash/logs/bb_snmp_clean.out",
               "@timestamp" => 2020-09-30T19:20:44.000Z,
                   "a_node" => "routerA",
                     "site" => "site1",
                     "host" => "fd8de0398490",
                 "out-mbps" => 0.51,
                   "z_node" => "routerB",
                   "market" => "blue"
}

The result, as expected was this:

{
    "@timestamp" => 2020-09-30T20:23:33.784Z,
       "task_id" => "routerA-routerB-site1",
      "out-mbps" => 0.51,
       "in-mbps" => 0.14,
      "@version" => "1"
}

So instead of doing this:

code => "
event.get()
map['out-mbps'] ||= event.get('out-mbps')
map['in-mbps'] ||= event.get('in-mbps')
"

Is there a way to loop through all the fields that exist for a specific task_id and map them out like I did with 'out-mbps' and 'in-mbps'?

The task_id I'm using generates different fields depending upon what a_node and z_node are selected. For example, if the task_id was "routerC-routerD-site2" then it won't have the 'out-mbps' and 'in-mbps' associated with it. Instead, that task_id would have other fields like 'cpu', etc, that I would want to map out as I did above.

I was thinking if maybe there's a way to create an array with all the fields that I know exist and then use ruby code to loop through it. If a task_id matched one of those fields we can then map it out.

All source data is being flattened so there are no nested json elements that would hit the aggregate filter.

Thank you.

input { generator { count => 1 lines => [ '{ "task": 1, "in-mb": 10 }', '{ "task": 1, "cpu": 0.1}', '{ "task": 2, "in-mb": 11 }' ] } }
filter {
    json { source => "message" remove_field => [ "message" ] }
    aggregate {
        task_id => "%{task}"
        timeout_task_id_field => "task_id"
        timeout_timestamp_field => "@timestamp"
        code => '
            event.to_hash.each { |k,v|
                unless map[k]
                    map[k] = v
                end
            }
        '
        push_previous_map_as_event => true
    }
}
output  { stdout { codec => rubydebug { metadata => false } } }

will produce these two aggregated events

{
      "task" => 1,
       "cpu" => 0.1,
   "task_id" => "1",
  "sequence" => 0,
     "in-mb" => 10,
  "@version" => "1",
      "host" => "dot.dot",
"@timestamp" => 2020-09-30T21:24:07.164Z
}
{
      "task" => 2,
   "task_id" => "2",
  "sequence" => 0,
     "in-mb" => 11,
      "tags" => [
    [0] "_aggregatefinalflush"
],
  "@version" => "1",
      "host" => "dot.dot",
"@timestamp" => 2020-09-30T21:24:07.251Z
}

@Badger, thank you that appears to be working. I've now noticed that I need to create a new task_id to aggregate a different set of data points that are coming in from the same source. Do I just add another aggregate filter with the new task_id under my current one?

That sounds reasonable.

Hmm, not sure what I'm doing wrong. When I add the second aggregate filter nothing writes to my output file even though I see the data print to the screen:

filter {    
       aggregate {
            task_id => "%{a_node}-%{z_node}-%{site}"
            timeout_task_id_field => "task_id"
            timeout_timestamp_field => "@timestamp"
            code => '
                event.to_hash.each { |k,v|
                    unless map[k]
                        map[k] = v
                    end
                }
            '
            push_previous_map_as_event => true
        }
        aggregate {
            task_id => "%{device}-%{site}"
            timeout_task_id_field => "task_id"
            timeout_timestamp_field => "@timestamp"
            code => '
                event.to_hash.each { |k,v|
                    unless map[k]
                        map[k] = v
                    end
                }
            '
            push_previous_map_as_event => true
        }
    }

output {
 stdout { codec => rubydebug }
 file {
   path => "/usr/share/logstash/logs/test.out"
   codec => json
 }}

Can you remove these options? You are not using a timeout but the existence of these options will drive the filter through some code you may not want executed.

I would suggest enabling log.level debug and see what the aggregate filters have to say.

I commented out those lines in my aggregate filters but still seeing the same behavior in my Docker Logstash environment.

I decreased the lines of my input file to only 10 records.

I turned on debug logs and saw that Logstash read 10 lines from the input file and then saw a bunch of theses:

logstash    | [2020-10-01T21:05:18,827][DEBUG][logstash.filters.aggregate][test][8d036abd68aba2f95541fdc0e0a9400834f3a52b66aacdecfaddaee0b67fef30] Aggregate create_timeout_event call with task_id 'routerA02-routerB01-vist'
logstash    | [2020-10-01T21:05:18,828][DEBUG][logstash.filters.aggregate][test][8d036abd68aba2f95541fdc0e0a9400834f3a52b66aacdecfaddaee0b67fef30] Aggregate successful filter code execution {:code=>"\n            event.to_hash.each { |k,v|\n                unless map[k]\n                    map[k] = v\n                end\n            }\n        "}
logstash    | [2020-10-01T21:05:18,829][DEBUG][logstash.filters.aggregate][test][4c9e8e04671751c5e1f8e48f0add8f5182644bed5ab8e38793370aa5954eee6a] Aggregate create_timeout_event call with task_id '%{device}-vist'
logstash    | [2020-10-01T21:05:18,830][DEBUG][logstash.filters.aggregate][test][4c9e8e04671751c5e1f8e48f0add8f5182644bed5ab8e38793370aa5954eee6a] Aggregate successful filter code execution {:code=>"\n            event.to_hash.each { |k,v|\n                unless map[k]\n                    map[k] = v\n                end\n            }\n        "}

I'm guessing the "Aggregate create_timeout_event" is generated when a task_id isn't matched?

I see 16 records print to the screen, which is comprised of the original and aggregated data.

Then I see 16 writes taking place but right after the last write there is a message stating "Required path was deleted ...":

logstash    | [2020-10-01T21:05:19,095][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,096][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,096][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,096][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,096][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,097][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,097][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,097][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,097][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,098][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,098][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,098][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,098][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,098][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,099][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,099][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,108][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] Required path was deleted, creating the file again {:path=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,109][INFO ][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] Opening file {:path=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,178][DEBUG][logstash.filters.aggregate][test] Aggregate remove_expired_maps call with '%{a_node}-%{z_node}-%{site}' pattern and 1 maps
logstash    | [2020-10-01T21:05:19,182][DEBUG][logstash.filters.aggregate][test] Aggregate create_timeout_event call with task_id 'routerA02-rourterB01-site'
logstash    | [2020-10-01T21:05:19,185][DEBUG][logstash.filters.aggregate][test][4c9e8e04671751c5e1f8e48f0add8f5182644bed5ab8e38793370aa5954eee6a] Aggregate successful filter code execution {:code=>"\n            event.to_hash.each { |k,v|\n                unless map[k]\n                    map[k] = v\n                end\n            }\n        "}
logstash    | [2020-10-01T21:05:19,187][DEBUG][logstash.filters.aggregate][test] Aggregate remove_expired_maps call with '%{device}-%{site}' pattern and 1 maps
logstash    | [2020-10-01T21:05:19,188][DEBUG][logstash.filters.aggregate][test] Aggregate create_timeout_event call with task_id '%{device}-site'
logstash    | {

Then there are two aggregated records printed to the screen with the "_aggregatefinalflush" associated with them followed by two more writes:

logstash    |                           "tags" => [
logstash    |         [0] "_aggregatefinalflush"
logstash    | [2020-10-01T21:05:19,244][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}
logstash    | [2020-10-01T21:05:19,244][DEBUG][logstash.outputs.file    ][test][156f4aa0e1a9391f5147a61eb465694676c1d95d797ab7d9c8feaeb0846a6534] File, writing event to file. {:filename=>"/usr/share/logstash/logs/test.out"}

Now when I check my test.out file, there are only two aggregated records in there which are the same records that printed last to the screen in the debug output. When I used my original source file that had 50K records in it, there was nothing printing to the test.out file.

I'm getting the feeling that maybe this might be an isolated issue to my local Docker Logstash environment because when I send the data to my ES cluster I see all the data there.

What's weird is if I comment out either of the two aggregate filters everything works fine and my test.out file has all the data in it.

It is certainly possible to have two aggregate filters in a pipeline. An example here shows how to join events together into aggregate events with a size limit.

However, the documentation says "all timeout options have to be defined in only one aggregate filter per task_id pattern (per pipeline)". I am wondering if that implies per task_id pattern and per pipeline are equivalent or whether it is saying that in each pipeline you can only have one aggregate with timeout options for each task_id pattern.

If you cannot get it to work then you could try using pipeline to pipeline communication with a forked path pattern. Possibly later using a collector pattern if you want a common output, or have common processing after the aggregation.

@Badger, can I delete the original event and just store the new event that contains the task_id field? If possible, how can I do that?

You can add event.cancel to the code option of the aggregate filter to delete the event you are aggregating.

Thanks, event.cancel worked. However, I was expecting to see one event and task_id for each timestamp since I added timestamp to my task_id. Instead, I'm seeing multiple events with the same task ID. Each event has different key/val pairs in them.

My aggregate filter with event.cancel:

if [device-type] == "cisco" {
        aggregate {
            task_id => "%{device}-%{interface-name}-%{timestamp}"
            timeout_task_id_field => "task_id"
            code => "
                #event.set('description', event.get('description'))
                event.to_hash.each { |k,v|
                unless map[k]
                    map[k] = v
                end
                event.cancel
            }
            "
            push_previous_map_as_event => true
        }
  } else {
      drop{}
    }

When using this if a new task_id arrives, the map is pushed. So you need events to be sorted by the task id to use this.

Can the sorting be done inside the code block? Or do the events need to be sorted elsewhere?

They would have to be sorted before they get to the filter section.

Hmm...I don't think I can do that with my current architecture. As an alternative, from the 3 events that are in the image above, is there any way I can extract a field out of event1 and insert it into event2?

You might try using push_map_as_event_on_timeout instead of push_previous_map_as_event

Yes, that is exactly what I did. I was able to get everything in one document doing the below:

if [device-type] == "cisco" {
        aggregate {
            task_id => "%{device}-%{interface-name}"
            push_map_as_event_on_timeout => true
            inactivity_timeout => 29
            timeout => 59
            timeout_tags => ['_aggregatetimeout']
            timeout_timestamp_field => "@timestamp"
            timeout_task_id_field => "task_id"
            code => "
                event.to_hash.each { |k,v|
                unless map[k]
                    map[k] = v
                end
                event.cancel
            }
            "
        }
  }

Question: Since I'm using push_map_as_event_on_timeout in my first aggregation, will it be possible to use push_previous_map_as_event in a second aggregation stanza that will aggregate the data coming out of the first aggregation?

I think so.

Hi @Badger, I was able to get the multiple aggregate filters working, also got the forked pipeline to work. From a performance perspective, which method do you think is better?

Also, I noticed with the Ruby code you gave me above I'm overwriting some of my data. For example, each one of my physical interfaces has at least 3 sub-interfaces. And these sub-interfaces also have metrics of their own such as in-octets, and out-octets. Each field for a sub-interface has the word "sub" in the prefix. The field name for the sub-interface is called sub-iface-state-name.

How can I add logic that will filter on the original task_id I provided but if the "sub-iface-state-name" field is detected then it creates a nested json entry for that sub-iface-state-name field and adds all the sub-* metrics to it?

Here is what a single document with sub-interface metrics looks like at the moment:

"_source": {
    "oper-status": "UP",
    "sub-iface-state-in-octets": 349426365840,
    "sub-iface-state-out-errors": 0,
    "in-unknown-protos": 0,
    "description": "router description;;",
    "out-mac-pause-frames": 0,
    "in-unicast-pkts": 5328173957,
    "sub-iface-state-state-index": 224,
    "in-mac-pause-frames": 0,
    "last-change": 623802,
    "sub-iface-state-ifindex": 153,
    "out-discards": 0,
    "task_id": "router-name-GigabitEthernet0/1/0/20",
    "in-fragment-frame": 0,
    "in-errors": 0,
    "site": "site-name",
    "target": "Interfaces",
    "sub-iface-index": 224,
    "out-multicast-pkts": 365417,
    "sub-iface-state-out-broadcast-pkts": 0,
    "sub-iface-state-last-clear": "2020-08-13T12:37:29Z",
    "out-broadcast-pkts": 107,
    "last-clear": "2020-08-06T07:15:18Z",
    "sub-iface-state-admin-status": "UP",
    "state/enabled": "true",
    "state/ifindex": 120,
    "sub-iface-state-in-unicast-pkts": 2989486695,
    "sub-iface-state-out-discards": 0,
    "sub-iface-state-out-octets": 156623612514,
    "interface-name": "GigabitEthernet0/1/0/20",
    "in-multicast-pkts": 198,
    "mtu": 1514,
    "in-octets": 603405311368,
    "hold-time/state/up": 10,
    "device": "router-a",
    "sub-iface-state-in-unknown-protos": 0,
    "@version": "1",
    "hold-time/state/down": 0,
    "sub-iface-state-description": "sub-interface description",
    "tags": [
      "_aggregatetimeout_p1",
      "_dissectfailure",
      "_grokparsefailure"
    ],
    "in-jabber-frames": 0,
    "in-discards": 0,
    "out-octets": 615942176669,
    "sub-iface-state-out-unicast-pkts": 1398413913,
    "sub-iface-state-in-broadcast-pkts": 0,
    "out-errors": 0,
    "sub-iface-state-last-change": 624302,
    "a_node": "a-node",
    "sub-iface-state-in-multicast-pkts": 1,
    "out-unicast-pkts": 5375874829,
    "out-8021q-frames": 0,
    "sub-iface-state-multicast": 40477,
    "in-crc-errors": 0,
    "state/name": "GigabitEthernet0/1/0/20",
    "in-oversize-frames": 0,
    "sub-iface-state-name": "GigabitEthernet0/1/0/20.224",
    "sub-iface-state-enabled": "true",
    "admin-status": "UP",
    "@timestamp": "2020-11-05T19:45:25.000Z",
    "sub-iface-state-oper-status": "UP",
    "name": "interfaces",
    "in-8021q-frames": 0,
    "in-broadcast-pkts": 0,
    "sub-iface-state-in-errors": 0,
    "sub-iface-state-in-discards": 0,
    "type": "ethernetCsmacd"
  }

I would not hazard a guess. Test and measure it.

You say you sometimes have 3 sub-interfaces, but give no indication of what the data would look like in that case.

For the event you have given you could do something like

if [sub-iface-state-name] {
    ruby {
        code => '
            h = {}
            event.to_hash.each { |k, v|
                if k =~ /^sub-iface-state/
                    h[k] = v
                    event.remove(k)
                end
            }
            event.set("someField", h)
        '
    }
}