Combine Metricbeat events into one with Logstash

Dear community

I have spent several days trying to combine Metricbeat events into one with Logstash.
I understand it should be done with the aggregate filter of Logstash. Still, I haven't figured it out.

I use Logstash to process events from Metricbeat. Metricbeat receives original SNMP polling events from Prometheus (remote write)
For a given instance and interface at the same time my two events look like this:
Event 1

"_source": {
    "prometheus": {
      "metrics": {
        "InCRC": 10
      },
      "labels": {
        "job": "jobname",
        "instance": "instance",
        "ifIndex": "123"
      }
    },
    "metricset": {
      "name": "remote_write"
    },
 },

Event 2

"_source": {
"prometheus": {
"metrics": {
"ifAdminStatus": 1,
"ifName": 1,
"ifOperStatus": 1,
"ifSpeed": 100000000,
"ifInDiscards": 2104369,
"ifInOctets": 4069550338,
},
"labels": {
"job": "jobname",
"instance": "instance",
"ifIndex": "123"
}
},
"metricset": {
"name": "remote_write"
},
},

How to combine these two events into a single document?

What tells you that those two events should be combined, and not some other arbitrary pair of events?

Hey!

prometheus.labels.instance together with Prometheus.labels.ifName uniquely identify the events (and the same @timestamp)

The first event does not have [prometheus][labels][ifName], so you cannot use that. Both events have [prometheus][labels][ifIndex] so you could try using that

    mutate { remove_field => [ "event", "log" ] }
    aggregate {
        task_id => "%{[prometheus][labels][ifIndex]}"
        push_map_as_event_on_timeout => true
        timeout => 10
        timeout_code => ''
        code => '
            map["data"] ||= []
            map["data"] << event.to_hash
            event.cancel

        '
    }

or maybe just merge the [prometheus][metrics] fields

    mutate { remove_field => [ "event", "log" ] }
    aggregate {
        task_id => "%{[prometheus][labels][ifIndex]}"
        push_map_as_event_on_timeout => true
        timeout => 5
        code => '
            map["data"] ||= {}
            map["metrics"] ||= {}
            metrics = event.remove("[prometheus][metrics]")
            map["data"] = map["data"].merge(event.to_hash)
            map["metrics"] = map["metrics"].merge(metrics)
            event.cancel
        '
        timeout_code => '
            event.remove("data").each { |k, v| event.set(k,v) }
            event.set("[prometheus][metrics]", event.remove("metrics"))
        '
    }

Thanks!
I tried with task_id => "%{[prometheus][labels][instance]}_%{[prometheus][labels][ifIndex]}"

Both options didn't work.
With event.cancel I see nothing in output (Elasticsearch)

As a workaround I used
map['InCRC'] ||= event.get('[prometheus][metrics][InCRC]')
event.set('[prometheus][metrics][InCRC]', map['InCRC'])

So basically atm I copy what I need from one event to another and keep them both.

Hello, @Badger

It’s sad that the code you proposed didn’t work out
As I am using work around and having duplicates I am still thinking how I can push mapped event and use event.cancel in aggregate filter so I will not have any problems. Now when I use event.cancel I see nothing in my output (Elasticsearch)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.