Aggregate filter to remember field accross lines

I have to remember date from one line and use it while processing event_timestamp filed on other lines of log file. For this I am using aggregate filter. Please guide what I am doing wrong here or any other way to do it?

My desired output is event_timestamp should contain where is the remembered value from TIMESTAMP line and is the event time of message in the log file.

My data :
14:03:03 (lmgrd) TIMESTAMP 5/13/2019
15:56:24 (cdslmd) OUT: "PKC0603" mnarasim@dlhsx00012
15:56:32 (cdslmd) OUT: "Analog_Design_Environment_XL" gautamd@dlhsx00005

My config :
filter {
grok {
match => [ "message", "%{DATA:event_timestamp} (%{DATA:lmgrd}) TIMESTAMP %{DATE:monthday}" ]
tag_on_failure => [ "message_data" ]
}

mutate {
	add_field => { "taskId" => "all" }
	}

if "message_data" not in [tags] {
	aggregate {
		task_id => "%{taskId}"
		code => "map['monthday'] = event.get('monthday')"
				}
}
else {
	aggregate {
		task_id => "%{taskId}"
		code => "try = event.set('monthday', map['monthday'])"
		map_action => "update"
		}
	grok { 
		 match => [ "message", "%{TIME:event_timestamp} \(%{DATA:lic_vendor_name}\) (?<event_type>(OUT|IN|DENIED))\: \"%{DATA:lic_feature_name}\" %{DATA:user_name}@%{HOSTNAME:host_name}" ] }
	
	mutate { replace => ["event_timestamp", "%{event_timestamp} %{try}" ] }
}	

My output
"event_timestamp" => "15:56:24 %{try}"

      "user_name" => "gautamd",
          "source" => "/home/msk/cadence-cdslmd-dlhl0939_dlhl0940_dlhl0941.log",
         "message" => "15:56:32 (cdslmd) OUT: \"Analog_Design_Environment_XL\" gautamd@dlhsx00005  ",
            "beat" => {
        "name" => "dlhl2117",
    "hostname" => "dlhl2117",
     "version" => "6.4.2"
},
      "@timestamp" => 2019-06-07T08:16:28.915Z,
            "host" => {
    "name" => "dlhl2117"
},
          "fields" => {
    "document_type" => "fle-type"
},
          "taskId" => "all",
"lic_feature_name" => "Analog_Design_Environment_XL",
          "offset" => 92,
 "event_timestamp" => "15:56:32 %{try}"

}
{
"event_type" => "OUT",
"@version" => "1",
"host_name" => "dlhsx00012",
"prospector" => {
"type" => "log"

You are assigning the return value of event.set to a variable called try, which is only accessible in the code block of the ruby filter. I think you want something more like

    grok {
        match => [ "message", "%{DATA:event_timestamp} \(%{DATA:lmgrd}\) TIMESTAMP %{DATE:monthday}" ]
        tag_on_failure => [ "message_data" ]
    }

    mutate {
        add_field => { "taskId" => "all" }
    }

    if "message_data" not in [tags] {
        aggregate {
            task_id => "%{taskId}"
            code => "map['monthday'] = event.get('monthday')"
        }
    } else {
        aggregate {
            task_id => "%{taskId}"
            code => "event.set('monthday', map['monthday'])"
            map_action => "update"
        }
        grok { match => [ "message", "%{TIME:event_timestamp} \(%{DATA:lic_vendor_name}\) (?<event_type>(OUT|IN|DENIED))\: \"%{DATA:lic_feature_name}\" %{DATA:user_name}@%{HOSTNAME:host_name}" ] }
    }
    mutate { replace => ["event_timestamp", "%{event_timestamp} %{monthday}" ] }

I modified the logstash config slightly but still the aggregate filter is unable to remember the value of map['monthday']. Here is my update :

My data which is in filename "cadence-cdslmd-dlhl0939_dlhl0940_dlhl0941-5.11.2019-event.log"

15:56:24 (cdslmd) OUT: "PKC0603" mnarasim@dlhsx00012
15:56:32 (cdslmd) OUT: "Analog_Design_Environment_XL" gautamd@dlhsx00005
15:56:33 (lmgrd) TIMESTAMP 5/13/2019
15:56:35 (cdslmd) OUT: "OASIS_Simulation_Interface" mnarasim@dlhsx00012
15:56:37 (cdslmd) IN: "111" guptasri@dlhsx00010
15:56:42 (cdslmd) OUT: "PKC0603" mnarasim@dlhsx00012

My config
filter {

grok {
	match => [ "message", "%{DATA:event_timestamp} \(%{DATA:lmgrd}\) TIMESTAMP %{DATE:monthday}" ]
	tag_on_failure => [ "message_data" ]
		}
mutate {
	add_field => { "taskId" => "all" }
	}

if "message_data" not in [tags] {
	aggregate {
		task_id => "%{taskId}"
		code => "map['monthday'] = event.get('monthday')"
				}
}
else {
		aggregate {
		task_id => "%{taskId}"
		code => "if (map['monthday'] != nil) then event.set('monthday', map['monthday']) else event.set('monthday', (event.get('source').split('-')[3])) end"
		}

	grok { 
		 match => [ "message", "%{TIME:event_timestamp} \(%{DATA:lic_vendor_name}\) (?<event_type>(OUT|IN|DENIED))\: \"%{DATA:lic_feature_name}\" %{DATA:user_name}@%{HOSTNAME:host_name}" ] }
	
	mutate { replace => ["event_timestamp", "%{monthday} %{event_timestamp}" ] }
	
	date { match => [ "event_timestamp", "M/d/yyyy HH:mm:ss","M.d.yyyy HH:mm:ss","ISO8601" ] target => "event_timestamp" }
}	

}
output {
stdout {
codec => rubydebug
}
}

In the output corresponding to input line 5 and line 6
15:56:37 (cdslmd) IN: "111" guptasri@dlhsx00010
15:56:42 (cdslmd) OUT: "PKC0603" mnarasim@dlhsx00012

The aggregate filter is not able to remember the map['monthday'] of previous lines i.e. "5/13/2019".
Output corresponding to input line 5 "event_timestamp" => 2019-05-11T10:26:37.000Z,
Output corresponding to input line 6 "event_timestamp" => 2019-05-11T10:26:42.000Z,

The expected output of
Line 5 should be : "event_timestamp" => 2019-05-13T10:26:37.000Z,
Line 6 should be : ""event_timestamp" => 2019-05-13T10:26:42.000Z,

Am I missing something?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.