How to use event.set to get the values of a variable?

I'm using Kafka's input plugin within my logstash pipline and I have enabled decorated_events => true

If I want to get the kafka topic and partition names I can do this:

     mutate {
         add_field => { "[topic_name]" => "%{[@metadata][kafka][topic]}"}
         add_field => { "[partition]" => "%{[@metadata][kafka][partition]}"}
     }

However, I'm trying to use event.set inside of the aggregate plugin to capture the same data, but its not working.

This is my aggregate plugin (please see the second to last line):

if [in-octets] and [out-octets] {
            aggregate {
                remove_field => [ "state" ]
                task_id => "%{device}-%{interface-name}_mbps"
                timeout_task_id_field => "timeout-mbps"
                inactivity_timeout => 90
                timeout_timestamp_field => "@timestamp"
                push_map_as_event_on_timeout => true
                timeout_tags => ['_done']
                code => "
                if map['previousTime'] == nil
                map['previousTime'] = event.get('timestamp');
                map['previousInOctets'] = event.get('in-octets');
                map['previousOutOctets'] = event.get('out-octets');
                else
                map['timeDifference'] = event.get('timestamp') - map['previousTime'];
                map['inOctetsDiff'] = event.get('in-octets') - map['previousInOctets'];
                map['outOctetsDiff'] = event.get('out-octets') - map['previousOutOctets'];

                # Division by zero check here
                if map['timeDifference'] > 0
                map['in-mbps'] = map['inOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
                map['out-mbps'] = map['outOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
                else
                # Set difference to 0
                map['in-mbps'] = 0
                map['out-mbps'] = 0
                end
                map['previousTime'] = event.get('timestamp');
                map['previousInOctets'] = event.get('in-octets');
                map['previousOutOctets'] = event.get('out-octets');
                event.set('in-mbps', map['in-mbps'].round(3));
                event.set('out-mbps', map['out-mbps'].round(3));
                if map['description'] != nil
                event.set('description', map['description']);
                end
                if map['admin-status'] != nil
                event.set('admin-status', map['admin-status']);
                end
                if map['in-errors'] != nil
                event.set('in-errors', map['in-errors']);
                end
                if map['out-errors'] != nil
                event.set('out-errors', map['out-errors']);
                end
                event.set('site', event.get('device')[0..3])
                event.set('a-node', event.get('device').split('.')[0])
                event.set('topic_name', map["%{[@metadata][kafka][partition]]}")
                end
                "
            }
        }
    }

I have also tried map[[@metadata][kafka][partition]] but that didn't work either. How can I accomplish this?

If you use event.set then that modifies the event that is passing through the aggregate filter. You never add anything like [@metadata][kafka][partition] to the map, so that will set [topic_name] to nil.

I suspect that what you want is

map["topic_name"] = event.get("[@metadata][kafka][partition]")

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.