I'm using Kafka's input plugin within my logstash pipline and I have enabled decorated_events => true
If I want to get the kafka topic and partition names I can do this:
mutate {
add_field => { "[topic_name]" => "%{[@metadata][kafka][topic]}"}
add_field => { "[partition]" => "%{[@metadata][kafka][partition]}"}
}
However, I'm trying to use event.set inside of the aggregate
plugin to capture the same data, but its not working.
This is my aggregate
plugin (please see the second to last line):
if [in-octets] and [out-octets] {
aggregate {
remove_field => [ "state" ]
task_id => "%{device}-%{interface-name}_mbps"
timeout_task_id_field => "timeout-mbps"
inactivity_timeout => 90
timeout_timestamp_field => "@timestamp"
push_map_as_event_on_timeout => true
timeout_tags => ['_done']
code => "
if map['previousTime'] == nil
map['previousTime'] = event.get('timestamp');
map['previousInOctets'] = event.get('in-octets');
map['previousOutOctets'] = event.get('out-octets');
else
map['timeDifference'] = event.get('timestamp') - map['previousTime'];
map['inOctetsDiff'] = event.get('in-octets') - map['previousInOctets'];
map['outOctetsDiff'] = event.get('out-octets') - map['previousOutOctets'];
# Division by zero check here
if map['timeDifference'] > 0
map['in-mbps'] = map['inOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
map['out-mbps'] = map['outOctetsDiff'].to_f / map['timeDifference'].to_f * 8.0 / 1000.0 / 1000.0;
else
# Set difference to 0
map['in-mbps'] = 0
map['out-mbps'] = 0
end
map['previousTime'] = event.get('timestamp');
map['previousInOctets'] = event.get('in-octets');
map['previousOutOctets'] = event.get('out-octets');
event.set('in-mbps', map['in-mbps'].round(3));
event.set('out-mbps', map['out-mbps'].round(3));
if map['description'] != nil
event.set('description', map['description']);
end
if map['admin-status'] != nil
event.set('admin-status', map['admin-status']);
end
if map['in-errors'] != nil
event.set('in-errors', map['in-errors']);
end
if map['out-errors'] != nil
event.set('out-errors', map['out-errors']);
end
event.set('site', event.get('device')[0..3])
event.set('a-node', event.get('device').split('.')[0])
event.set('topic_name', map["%{[@metadata][kafka][partition]]}")
end
"
}
}
}
I have also tried map[[@metadata][kafka][partition]]
but that didn't work either. How can I accomplish this?