Merge message with aggregate filter

Hi,

I am trying to merge 2 log lines with the aggregate filter and after a week browsing the internet and this forum I still cannot get this working. Below you have 2 loglines with an identifier ExchangeId.

What I want to achieve is one document that has parsed both json lines REQ_OUT in the req-out nested object and the RESP_IN in the resp-in nested object. How it is now is that I get 2 documents and the objects do not exists. The logline is passing the json filter since I do have a meta object.

Can anyone help.

sample input:

    {"timestamp":"2021-04-02T05:49:45.251Z","severity":"INFO","Type":"REQ_OUT","Address":"https://a-link.com","ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2",REQ_OUT:"test1"}
    {"timestamp":"2021-04-02T05:50:45.534Z","severity":"INFO","Type":"RESP_IN","ResponseCode":"200","Address":"https://a-link.com","ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2",RESP_IN:"test2"}

logstash filter:

    filter {
      json {
        source => "message"
        skip_on_invalid_json => "true"
        target => "meta"
      }

      if [meta][ExchangeId] {
        mutate {
            add_field => { "ExchangeId" => "%{[meta][ExchangeId]}"  }
        }
        if [meta][Type] =~ /REQ_OUT/ {
          aggregate {
            task_id => "%{ExchangeId}"
    #first log line so we save the message to REQ_OUT
              code => "
                map['REQ_OUT'] ||= event.get('message')
              "
            map_action => "create"
            timeout => 15
            push_map_as_event_on_timeout => true
            timeout_tags => ["aggregate_timeout"]
          }
        }
        if [meta][Type] =~ /RESP_IN/ {
          aggregate {
            task_id => "%{ExchangeId}"
    #2nd log line so we save the message to RESP_IN
            code => "map['RESP_IN'] ||=  event.get('message')"
            map_action => "update"
            end_of_task => true
          }
        }
      }
      #decode de json of the messages place in REQ_OUT and RESP_IN by aggregate filter
      json {
        source => "REQ_OUT"
        skip_on_invalid_json => "true"
        target => "req-out"
      }
      json {
        source => "RESP_IN"
        skip_on_invalid_json => "true"
        target => "resp-in"
      }
    }

Logstash does not give an error

    [DEBUG] 2021-04-02 15:40:45.004 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"\n            map['REQ_OUT'] ||= event.get('message')\n          "}    
    [DEBUG] 2021-04-02 15:40:45.005 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"map['RESP_IN'] ||=  event.get('message')"}

logstash does not log an error because you set skip_on_invalid_json to true. That is invalid JSON, so it does not get parsed. If you had

... "ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2","RESP_IN":"test2"

Then you could use

    json { source => "message" target => "meta" }
    aggregate {
        task_id => "%{[meta][ExchangeId]}"
        timeout => 15
        push_map_as_event_on_timeout => true
        code => '
            map["Address"] ||= event.get("[meta][Address]")
            map["RESP_IN"] ||= event.get("[meta][RESP_IN]")
            map["REQ_OUT"] ||= event.get("[meta][REQ_OUT]")
            map["timestamp"] ||= event.get("[meta][timestamp]")
            map["ResponseCode"] ||= event.get("[meta][ResponseCode]")
            event.cancel # If you do not want the source events
        '
        timeout_task_id_field => "ExchangeId"
    }

and get

{
     "Address" => "https://a-link.com",
     "RESP_IN" => "test2",
  "@timestamp" => 2021-04-02T16:42:55.594Z,
   "timestamp" => "2021-04-02T05:49:45.251Z",
     "REQ_OUT" => "test1",
    "@version" => "1",
  "ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"ResponseCode" => "200"
}

Note that when using push_map_as_event_on_timeout the only things in the pushed event are what you added to the map.

Hi,

I'll test this tomorrow. But what do you do when you don't know all the fields in the json and want to put the fields in an object?

eg: I want something like this output:

{
    "@timestamp": "2021-04-02T16:42:55.594Z",
    "@version": "1",
    "ExchangeId": "260e06a3-9cb5-4154-bf97-637e929fa4c2",
    "resp-in": {
            "Address": "https://a-link.com",
            "RESP_IN": "test2",
            "timestamp": "2021-04-02T05:49:45.251Z",
            "fieldx": "something",
            "fieldy": "something"
    },
    "req-out": {
           "Address": "https://a-link.com",
            "REQ_OUT": "test1",
            "timestamp": "2021-04-02T05:49:45.251Z",
            "fieldA": "something",
            "fieldB": "something",
            "ResponseCode": "200"
     }
}

I have not tested it, but something like

code => '
    type = event.get("Type")
    if type == "REQ_OUT"
        dst = "req-out"
    elsif type == "RESP_IN"
        dst = "resp-in"
    end
    if dst
        event.to_hash.each { |k, v|
            map[dst] = {}
            unless [ "host", "path", "ExchangeId"].include? k
                map[dst][k] = v
            end
        }
    end
'

Hi,

I do get a Aggregate successful filter code execution. But I just get 2 documents only with the meta object. Trying to debug

filter {
json { source => "message" target => "meta" }
aggregate {
    task_id => "%{[meta][ExchangeId]}"
    timeout => 15
    push_map_as_event_on_timeout => true
    code => '
      type = event.get("Type")
      if type == "REQ_OUT"
        dst = "req-out"
      elsif type == "RESP_IN"
        dst = "resp-in"
      end
      if dst
        event.to_hash.each { |k, v|
          map[dst] = {}
          unless [ "host", "path", "ExchangeId"].include? k
            map[dst][k] = v
          end
        }
      end
    '
    timeout_task_id_field => "ExchangeId"
    timeout_tags => ['_aggregatetimeout']

}

}

Did some more debugging and for some reason the aggregate filter is not working it is not matching on the "ExchangeId" and just produces 2 documents.
I've tried to simplify the code but with no success. The most annoying thing of all is that when I start logstash in debug mode it does give a

DEBUG] 2021-04-14 06:57:14.170 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"\n type = event.get(\"Type\")\n map[type] = {}\n event.to_hash.each { |k, v|\n map[type][k] = v\n } \n "}

event. So it does do something. Is this a bug?

If test.txt only has two lines why would expect more than two events?

Hi,

maybe I am on the completely wrong track but I would expect 1 document: (when event cancel is active I get nothing)

  {
      "Type" => "REQ_OUT",
  "@version" => "1",
      "host" => "LGS02",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"@timestamp" => 2021-04-14T06:46:29.221Z,
      "path" => "/tmp/test.txt"
	  "REQ_OUT" => {
	    "timestamp" => "2021-04-02T05:50:45.534Z",
		"severity" => "INFO",
		...
	  
	  }
	  "RESP_IN" => {
	    "timestamp" => "2021-04-02T05:50:44.251Z",
		"severity" => "INFO",
		ResponseCode" => "200",
		...
	  }
  "@version" => "1",
      "host" => "LGS02",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"@timestamp" => 2021-04-14T06:46:29.247Z,
      "path" => "/tmp/test.txt"

}

Using

    json { source => "message" target => "[@metadata][data]" remove_field => [ "message" ] }
    aggregate {
        task_id => "%{[@metadata][data][ExchangeId]}"
        timeout => 5
        push_map_as_event_on_timeout => true
        code => '
            # we want 2 nested objects in the document object REQ_OUT and RESP_IN
            type = event.get("[@metadata][data][Type]")
            map[type] = {}
            event.get("[@metadata][data]").each { |k, v|
                unless [ "ExchangeId"].include? k
                    map[type][k] = v
                end
            }
            event.to_hash.each { |k, v|
                unless [ "@version" ].include? k
                    map[type][k] = v
                end
            }
            event.cancel
        '
        timeout_task_id_field => "ExchangeId"
        timeout_tags => ['_aggregatetimeout']
    }

I get

{
   "RESP_IN" => {
         "RESP_IN" => "test2",
        "severity" => "INFO",
            "path" => "/home/user/foo.txt",
            "Type" => "RESP_IN",
    "ResponseCode" => "200",
      "@timestamp" => 2021-04-14T14:57:51.115Z,
         "Address" => "https://a-link.com",
            "host" => "...",
       "timestamp" => "2021-04-02T05:50:45.534Z"
},
   "REQ_OUT" => {
      "severity" => "INFO",
          "path" => "/home/user/foo.txt",
          "Type" => "REQ_OUT",
       "REQ_OUT" => "test1",
    "@timestamp" => 2021-04-14T14:57:51.115Z,
       "Address" => "https://a-link.com",
          "host" => "...",
     "timestamp" => "2021-04-02T05:50:44.251Z"
},
"@timestamp" => 2021-04-14T14:58:01.101Z,
  "@version" => "1",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
      "tags" => [
    [0] "_aggregatetimeout"
]
}

Which version of logstash are you running? I wonder if you are hitting the issue mentioned in this thread.

Hi,

I'm running 7.8.0 so I guess I hit that bug. I'll try to set the pipeline.java_execution or just upgrade to the latest version.

Pretty sure this is the sollution. You are a god among men! :smiley:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.