How to merge two data sets into one with Logstash?

Hi,

I'm trying to merge two data sets into one document that will get stored in ES.

My first data set looks like this:

"_source": {
    "protocol-name": "BGP",
    "name-tag": "default",
    "@version": "1",
    "host": "lab-aar-deployment-55b8f56f6f-lh2fh-telegraf-agent",
    "device": "aara101.mgt.net",
    "name": "bgp",
    "@timestamp": "2022-02-24T20:25:32.506Z",
    "identifier": "BGP",
    "session-state": "ESTABLISHED",
    "neighbor-address": "2001:578:30:1100:10:10:10:2"
  }

My second data set looks like this:

"_source": {
    "protocol-name": "BGP",
    "name-tag": "default",
    "@version": "1",
    "host": "lab-aar-deployment-55b8f56f6f-lh2fh-telegraf-agent",
    "prefixes-installed": 0,
    "device": "aara101.mgt.net",
    "name": "bgp",
    "@timestamp": "2022-02-24T20:25:32.505Z",
    "identifier": "BGP",
    "neighbor-address": "2001:578:30:1100:10:10:10:2",
    "afi-safi-name": "IPV4_UNICAST"
  }

As you can see "session-state" is only in the first data set and I would like to merge it with the second data set and create one document that has everything in it.

I believe I need to use the aggregate plugin but just not sure how.

My first failed attempt:

if [session-state] {
      aggregate {
          task_id => "%{device}-%{neighbor-address}"
          code => "
              event.to_hash.each { |k,v|
                  unless map[k]
                  map[k] = v
              end
              }
          "
      }
    } else {
      aggregate {
        task_id => "%{device}-%{neighbor-address}-%{afi-safi-name}"
        end_of_task => true
        timeout => 60
         code => "
              event.to_hash.each { |k,v|
                  unless map[k]
                  map[k] = v
              end
              }
              # event.cancel
          "
      }

I would suggest something much more simple

if [session-state] {
    aggregate {
        task_id => "%{device}-%{neighbor-address}"
        code => '
            map["session-state"] = event.get("session-state")
            event.cancel
        '
    }
} else {
    aggregate {
        task_id => "%{device}-%{neighbor-address}"
        end_of_task => true
        timeout => 60
        code => 'event.set("session-state", map["session-state"])'
    }
}

This gives me a null value for session-state:

"_source": {
    "protocol-name": "BGP",
    "prefixes-sent": 0,
    "name-tag": "default",
    "@version": "1",
    "host": "lab-aar-deployment-55b8f56f6f-lh2fh-telegraf-agent",
    "device": "aara101.mgt.net",
    "prefixes-received": 0,
    "name": "bgp",
    "@timestamp": "2022-02-24T21:31:58.111Z",
    "identifier": "BGP",
    "neighbor-address": "2001:578:30:1100:10:10:10:2",
    "afi-safi-name": "L2VPN_EVPN",
    "session-state": null
  }

A neighbor-address field can have multiple different afi-safi-name fields associated with it. So I used the below as the task_id in the second aggregate statement:
%{device}-%{neighbor-address}-%{afi-safi-name}

However, I still get null for session-state.

You have to use the same value for the task_id option if you expect two aggregates to use the same map.

So I guess this is not possible then since I need to use 2 separate task IDs.

But I don't think you do. You can save the value of [session-state] for "%{device}-%{neighbor-address}" and apply it to any events that have a matching "%{device}-%{neighbor-address}" regardless of their [afi-safi-name].

Remove the end_of_task => true from the second aggregate and perhaps extend the timeout.

I forgot to mention there are other fields that come in which are tied to a unique [afi-safi-name] and need to be stored correctly, for example:

{
  "fields": {
    "prefixes-installed": 86
  },
  "name": "bgp",
  "tags": {
    "afi-safi-name": "IPV4_UNICAST",
    "device": "aara101.mgt.net",
    "host": "lab-aar-deployment-55b8f56f6f-q5t4v-telegraf-agent",
    "identifier": "BGP",
    "name-tag": "default",
    "neighbor-address": "10.10.10.1",
    "path": "",
    "protocol-name": "BGP"
  }

Sometimes the other fields come together:

{
  "fields": {
    "prefixes-installed": 0,
    "prefixes-received": 0,
    "prefixes-sent": 0
  },
  "name": "bgp",
  "tags": {
    "afi-safi-name": "L2VPN_EVPN",
    "device": "aara101.mgt.net",
    "host": "lab-aar-deployment-55b8f56f6f-q5t4v-telegraf-agent",
    "identifier": "BGP",
    "name-tag": "default",
    "neighbor-address": "10.10.10.1",
    "path": "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/afi-safis/afi-safi/state",
    "protocol-name": "BGP"
  },
  "timestamp": 1645801410
}

This is why I was using [afi-safi-name] in my task-id inside the aggregate plugin. [session-state] is not unique to an [afi-safi-name] and I just need to capture it from the [device]-[neighbor-address] task_id and store it with the data from the [device][neighbor-address][afi-safi-name] task_id.

Is it possible to be able to process two different maps within a set timeout period in Logstash?

In that case I honestly have no idea what you are trying to do.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.