Push_previous_map_as_event if fields exist

In the above json this is the sub interface:
"sub-iface-state-name": "GigabitEthernet0/1/0/20.224" The .224 at the end denotes its a sub-interface.

Then all the fields with the word "sub" in the field name are the metrics for that sub-interface. For example, the below metrics belong to the sub-interface GigabitEthernet0/1/0/20.224

"sub-iface-state-description": "sub-interface description"
"sub-iface-state-out-unicast-pkts": 1398413913,
"sub-iface-state-in-broadcast-pkts": 0,
"sub-iface-state-in-octets": 349426365840,
"sub-iface-state-out-octets": 156623612514,
"sub-iface-state-admin-status": "UP",
..

The physical interface GigabitEthernet0/1/0/20 has the following sub-interfaces:
GigabitEthernet0/1/0/20.221
GigabitEthernet0/1/0/20.222
GigabitEthernet0/1/0/20.223
GigabitEthernet0/1/0/20.224

In Kibana when I search for the specific router and physical interface I only see the last sub-interface for that physical interface display in the output. Seems like whichever sub-interface metric came last on the stream is what gets stored:

I'm looking for a way to store the physical interface and sub-interface together in one document if possible. For example, this would be ideal:

"device": routerA
"sub-interface1": ["sub-iface-state-name": "routerA.221", "sub-iface-state-description": "sub-interface description", "sub-iface-state-in-octets": 349426365840, "sub-iface-state-out-octets": 156623612514, ...]
"sub-interface2": ["sub-iface-state-name": "routerA.222", "sub-iface-state-description": "sub-interface description", "sub-iface-state-in-octets": 2222, "sub-iface-state-out-octets": 3333, ...]
"sub-interface3": ...

Are you saying that the sub-interfaces come in as separate events? That the entire document you showed relates to one sub-interface?

If so, can you do an aggregate using "state/name" as the task_id?

Correction to my previous statement. I was only able to get the forked pipeline working but not multiple aggregate stanzas in one pipeline. I tried using the below aggregate stanzas in one pipeline but was not able to generate bit rate fields (in-mbps, out-mbps)

 aggregate {
        task_id => "%{device}-%{interface-name}"
        push_map_as_event_on_timeout => true
        inactivity_timeout => 60
        timeout => 120
        timeout_tags => ['_aggregatetimeout']
        timeout_timestamp_field => "@timestamp"
        timeout_task_id_field => "task_id"
        code => "
            #event.set('description', event.get('description'))
            event.set('site', event.get('device')[0..3])
            event.set('a_node', event.get('device').split('.')[0])
            event.to_hash.each { |k,v|
            unless map[k]
                map[k] = v
            end
            event.cancel
        }
        "
    }

aggregate {
        task_id => "%{device}-%{interface-name}"
        code => "
          if map['previousTime'] == nil
            map['previousTime'] = event.get('timestamp');
            map['previousInOctets'] = event.get('in-octets');
            map['previousOutOctets'] = event.get('out-octets');
          else
            map['timeDifference'] = event.get('timestamp') - map['previousTime'];
            map['inOctetsDiff'] = event.get('in-octets') - map['previousInOctets'];
            map['outOctetsDiff'] = event.get('out-octets') - map['previousOutOctets'];

            # Division by zero check here
          if map['timeDifference'] > 0
              map['in-mbps'] = map['inOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
              map['out-mbps'] = map['outOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
            else
              # Set difference to 0
              map['in-mbps'] = 0
              map['out-mbps'] = 0
            end
            map['previousTime'] = event.get('timestamp');
            map['previousInOctets'] = event.get('in-octets');
            map['previousOutOctets'] = event.get('out-octets');
            event.set('in-mbps', map['in-mbps'].round(3));
            event.set('out-mbps', map['out-mbps'].round(3));
          end
        "
    }

Yes, sub-interface data comes in multiple separate events. I think I need to keep working with forked pipelines until I can create an individual document with metrics for the physical interface and separate documents with metrics for each sub-interface.

@Badger, does the timeout_code option in the aggregate filter support Ruby code?
Should something like this work?

timeout_code => "
   if map['previousTime'] == nil
      map['previousTime'] = event.get('timestamp')
   end
"

Yes, timeout_code allows you to run arbitrary ruby code when a timeout occurs.

A more common way to write that particular snippet would be

timeout_code => "
      map['previousTime'] ||= event.get('timestamp')
"

I almost got my pipeline-to-pipeline communication working. I'm seeing data generated as I want it but I'm also seeing these errors:

Aggregate exception occurred {:error=>#<TypeError: nil can't be coerced into Integer>

Aggregate exception occurred {:error=>#<NoMethodError: undefined method `-' for nil:NilClass>

What should I be looking at in the data that is causing these exceptions?

Problems with nil will often mean you are referencing a field's value without checking that it exists.

The first could be caused (just an example) by x = someArray[event.get("someField")] when there is no [someField] on the event. The second could be caused by y = event.get("a") - event.get("b") when there is no [a] field on the event.

You can fix these by testing whether the value is nil before using it

a = event.get("a")
b = event.get("b")
if a and b
    y = event.get("a") - event.get("b")
end

Thanks, that helped get rid of those errors.
Question, how can I tweak the below code to not map any field that begins with the word "sub":

        event.to_hash.each { |k,v|
            unless map[k]           # Over here I don't want any fields beginning with the word "sub"
                map[k] = v
            end
            event.cancel
        }

Try

    event.to_hash.each { |k,v|
        if k !~ /^sub/
            unless map[k]           # Over here I don't want any fields beginning with the word "sub"
                map[k] = v
            end
        end
        event.cancel
    }

Thanks, that helped clean up the data.
Question, is it possible inside of the aggregate filter to check if a task_id exists?

For example:

aggregate {
  task_id => "%{device}-%{interface-name}-%{sub-iface-state-state-index}"
  if [task_id] {
    # proceed with executing the aggregate filter
  } else { drop_event }
}

If you want to test whether an event exists you can use

if event.get("someField")
    ...
end

I'm getting closer to achieving my end result which is being able to store the physical interface and sub-interface data into separate documents inside the same index.

However, I don’t understand why I’m not able to generate the sub-in-mbps/sub-out-mbps bit rate from pipeline20 but the in-mbps/out-mbps bit rates are being generated from pipeline30.

I believe for some reason the aggregate filters in pipeline20 and pipeline30 are not working properly because if I uncomment out event.cancel() then no events get stored to elastic. Also, there are no events being generated with the _aggregatetimeout-bitrate-sub or _aggregatetimeout-bitrate-phy timeout tags.

This is my pipelines.yml file:

MyPipeline.yml
- pipeline.id: pipeline1
  pipeline.workers: 1
  queue.type: persisted
  config.string: |
    input {
      kafka {
        client_id => "test-v2"
        topics => ["test-topic"]
        group_id => "test-v3"
        bootstrap_servers => "testServer4.test.com:9093,testServer5.test.com:9093,testServer6.test.com:9093"
        consumer_threads => 1
        codec => "json"
        security_protocol => "SSL"
        ssl_keystore_location => "/usr/share/logstash/config/test.jks"
        ssl_keystore_password => "test"
        ssl_keystore_type => PKCS12
        ssl_truststore_location => "/usr/share/logstash/config/test_ca.jks"
        auto_offset_reset => "latest"
      }
    }

    filter {
      if [fields] {
        ruby {
          code => "
            event.get('fields').each { |k, v|
              event.set(k,v)
            }
            event.remove('fields')
          "
        }
      }

      if [tags] {
        ruby {
          code => "
            event.get('tags').each { |k, v|
              event.set(k,v)
            }
            event.remove('tags')
          "
        }
      }

      date {
        match => [ "timestamp", "UNIX" ]
      }

      aggregate {
        task_id => "%{device}-%{interface-name}-%{sub-iface-state-state-index}"
        push_map_as_event_on_timeout => true
        timeout => 60
        timeout_tags => ['_aggregatetimeout-sub']
        timeout_timestamp_field => "@timestamp"
        timeout_task_id_field => "task_id"
        timeout_code => "event.set('test1', 'from sub agg timeout_code stanza')"
        code => "
            event.set('site', event.get('device')[0..3])
            event.set('a_node', event.get('device').split('.')[0])
            event.to_hash.each { |k,v|
              unless map[k]
                  map[k] = v
              end
            }
            # event.set('test', 'bazzinga-sub')
            event.set('test2', 'from sub agg code stanza')
            event.cancel()
        "
      }

      aggregate {
        task_id => "%{device}-%{interface-name}"
        push_map_as_event_on_timeout => true
        # inactivity_timeout => 60
        timeout => 60
        timeout_tags => ['_aggregatetimeout-phy']
        timeout_timestamp_field => "@timestamp"
        timeout_task_id_field => "task_id"
        timeout_code => "event.set('test3', 'from phy agg timeout_code stanza')"
        code => "
            event.set('site', event.get('device')[0..3])
            event.set('a_node', event.get('device').split('.')[0])
            event.to_hash.each { |k,v|
                if k !~ /^sub/
                  unless map[k]
                      map[k] = v
                  end
                end  
            }
            event.set('test4', 'from phy agg code stanza')
            event.cancel()
        "
      } 
    }

    output { 
      # stdout { }
      if [device] and [interface-name] and [sub-iface-state-state-index] {
        pipeline { send_to => "PL1" } 
      } else {
        pipeline { send_to => "PL2"}
      }
    }

- pipeline.id: pipeline20
  queue.type: persisted
  config.string: |
    input { pipeline { address => "PL1" } }
    filter {
      
      date {
        match => [ "timestamp", "UNIX" ]
      }

      # if [sub-iface-state-in-octets] {
      aggregate {
        task_id => "%{device}-%{interface-name}-%{sub-iface-state-state-index}"
        timeout => 120
        timeout_tags => ['_aggregatetimeout-bitrate-sub']
        timeout_timestamp_field => "@timestamp"
        timeout_code => "event.set('test5', 'from sub bitrate timeout_code stanza')"
        code => "
          event.to_hash.each { |k,v|
            unless map[k]
              map[k] = v
            end
          }
          if map['previousTime'] == nil
            map['previousTime'] = event.get('timestamp');
            map['previousSubInOctets'] = event.get('sub-iface-state-in-octets');
            map['previousSubOutOctets'] = event.get('sub-iface-state-out-octets');
            # event.set('testField', 'preTime_is_nil_but_got_current_time')
          else
            # event.set('testField', 'preTime_is_not_nil')
            map['timeDifference'] = event.get('timestamp') - map['previousTime'];
            map['subInOctetsDiff'] = event.get('sub-iface-state-in-octets') - map['previousSubInOctets'];
            map['subOutOctetsDiff'] = event.get('sub-iface-state-out-octets') - map['previousSubOutOctets'];
          if map['timeDifference'] > 0
              map['sub-in-mbps'] = map['subInOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
              map['sub-out-mbps'] = map['subOutOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
            else
              # Set difference to 0
              map['sub-in-mbps'] = 0
              map['sub-out-mbps'] = 0
            end
            map['previousTime'] = event.get('timestamp');
            map['previousSubInOctets'] = event.get('sub-iface-state-in-octets');
            map['previousSubOutOctets'] = event.get('sub-iface-state-out-octets');
            event.set('sub-in-mbps', map['sub-in-mbps'].round(3));
            event.set('sub-out-mbps', map['sub-out-mbps'].round(3));
          end
          event.set('test6', 'from sub bitrate code stanza')
          # event.cancel()
        "
      }
      # }
      
      mutate {
        remove_field => [ "host", "timestamp" ]
        lowercase => ["a_node", "site"]
      }  
    }
    output { 
      #stdout { }
      elasticsearch {
        index => "test-bit-rate-%{+YYYY.MM.dd}"
        hosts => "https://server1.test.net:443"
        user => "user1"
        password => "user1pass"
        manage_template => false
        http_compression => true    
        sniffing => false
      }
    }

- pipeline.id: pipeline30
  queue.type: persisted
  config.string: |
    input { pipeline { address => "PL2" } }
    filter {
      
    date {
        match => [ "timestamp", "UNIX" ]
      }

      # if "Loopback" in [interface-name] { drop {} }
      # if ![admin-status] { drop {} }

      if [in-octets] {
      aggregate {
        task_id => "%{device}-%{interface-name}"
        timeout => 120
        timeout_tags => ['_aggregatetimeout-bitrate-phy']
        timeout_timestamp_field => "@timestamp"
        timeout_code => "event.set('test7', 'from phy bitrate timeout_code stanza')"
        code => "
          event.to_hash.each { |k,v|
              unless map[k]
                map[k] = v
              end
          }
          if map['previousTime'] == nil
            map['previousTime'] = event.get('timestamp');
            map['previousInOctets'] = event.get('in-octets');
            map['previousOutOctets'] = event.get('out-octets');
          else
            map['timeDifference'] = event.get('timestamp') - map['previousTime'];
            map['inOctetsDiff'] = event.get('in-octets') - map['previousInOctets'];
            map['outOctetsDiff'] = event.get('out-octets') - map['previousOutOctets'];

            # Division by zero check here
          if map['timeDifference'] > 0
              map['in-mbps'] = map['inOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
              map['out-mbps'] = map['outOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
            else
              # Set difference to 0
              map['in-mbps'] = 0
              map['out-mbps'] = 0
            end
            map['previousTime'] = event.get('timestamp');
            map['previousInOctets'] = event.get('in-octets');
            map['previousOutOctets'] = event.get('out-octets');
            event.set('in-mbps', map['in-mbps'].round(3));
            event.set('out-mbps', map['out-mbps'].round(3));
          end
          event.set('test8', 'from phy bitrate code stanza')
          # event.cancel()
        "
      }
      }
    }
    output { 
      #stdout { }
      elasticsearch {
        index => "test-bit-rate-%{+YYYY.MM.dd}"
        hosts => "https://server1.test.net:443"
        user => "user1"
        password => "user1pass"
        manage_template => false
        http_compression => true    
        sniffing => false
      }
    }

You may have reached (or even passed) the limit of what it makes sense to do in an aggregate filter. Or four.

It might make sense to go back to the initial data set, and review whether the approach you have managed to piece together actually makes sense given how much you have learned about logstash in the last several weeks.

If you think it does then I would suggest sending the output from each of the three pipelines to files using a file output, which will dump data as JSON. Then review each file and make sure the data looks the way you think it does.

If there are two aggregate statements in a logstash.yml file then is it possible to prevent the output of the first aggregate statement from passing through the second aggregate statement? I'm noticing that the output from the second aggregate statement contains the timeout_tags from the first aggregate statement.

Sure, you can add a tag in the first aggregate and make the second one conditional on the tag not being present.

Is it possible to put this code inside the timeout_code stanza?

I tried the following but getting an error (NameError: undefined local variable or method `map')

timeout_code => '
        event.to_hash.each { |k,v|
          unless map[k]
              map[k] = v
          end
        }
    '

No. When the timeout_code block is called the map has already been converted to an event and the map itself deleted.

Hi @Badger, I'm going down a new rabbit hole with the aggregate plugin :slightly_smiling_face:

I have cpu sensor data coming in like this:

Currently, each metric (property-name) for a specific component-name comes in as a separate event. As you can see here there are two separate events for FPC4:NPU1, each one containing a different property-name.

I want to use the same code from above to aggregate all the field-values for a specific task_id inside one document and store it in elastic but it does not seem to work. The only way to get it to work is to manually map the fields.

This does not work:

if [component-name] and [property-name] and [device] {
    aggregate {
        task_id => "%{device}-%{component-name}"
        timeout_task_id_field => "task_id"
        timeout_timestamp_field => "@timestamp"
        code => "
            event.to_hash.each { |k, v|
                unless map[k]
                    map[k] = v
                end
            }
            event.cancel()
        "
        push_previous_map_as_event => true
    }
}

This works:

if [component-name] and [property-name] and [device] {
    aggregate {
        task_id => "%{device}-%{component-name}"
        timeout_task_id_field => "task_id"
        timeout_timestamp_field => "@timestamp"
        code => "
            map['device'] ||= event.get('device')
            map['component-name'] ||= event.get('component-name')
            map['name'] ||= event.get('name')
            map[event.get('property-name')] = event.get('property-value')
            event.cancel()
        "
        push_previous_map_as_event => true
    }
}

Also, I don't why I had to do:
map[event.get('property-name')] = event.get('property-value')
Instead of:
map['property-name'] = event.get('property-value')

This is what data looks like when its aggregated:


There's a ton more property-names in this document for FPC4:NPU1 but I couldn't screen-shot the all of them into one image. But this is what I was trying to accomplish with the cooler looking code stanza. :slightly_smiling_face:

The second will create a field called [property-name] with a value like 27,952. The first will create a field called [mem-util-next-hop-exception-bytes-allocated] with that value.

Do you know why this is not working? I'd like to use this code to generate my output since I don't have to map out each field and value manually.