Aggregation multiple log entry

hello everyone !

So I'm struggling with aggregation in logstash. Here is my problem:

I have an API platform that log all the call and when a call is made it generate 3 or 4 row of log. I'd like to aggregate those multiple log entry into one "big" record that contain all what I want.

Here is an example of a log record:

     {
    	    "timestamp": 1556787558121,
    	   "correlationId": "66b1ca5c604be39706af29e7",
    	   "processInfo": {
    		"hostname": "a98sv068",
    		"domainId": "3c03047f-74bb-48a0-a625-a7fa4f04ef9e",
    		"groupId": "group-10",
    		"groupName": "ManagerLan",
    		"serviceId": "instance-77",
    		"serviceName": "ManagerLan",
    		"version": "v7.5.3-Internal"
    	},
    	"transactionElement": {
    		"leg": 1,
    		"duration": 12,
    		"serviceName": "SA_Toolbox_API_UAT",
    		"operation": "getClientApplicationLogo",
    		"finalStatus": null,
    		"protocolInfo": {
    			"http": {
    				"uri": "/applications/18101/logo",
    				"status": 200,
    				"statusText": "",
    				"method": "GET",
    				"vhost": null,
    				"wafStatus": 0,
    				"bytesSent": 1948,
    				"bytesReceived": 915,
    				"remoteName": "api.toolbox.SA.com",
    				"remoteAddr": "10.16.65.14",
    				"localAddr": "10.15.41.1",
    				"remotePort": "443",
    				"localPort": "45840",
    				"sslSubject": "/C=FR/ST=Hauts-de-Seine/L=Courbevoie/CN=toolbox.SA.com",
    				"authSubjectId": "Pass Through"
    			},
    			"recvHeader": "HTTP\/1.1 200 \r\nCache-Control: max-age=7200, private\r\nX- 
   Content-Type-Options: nosniff\r\nX-XSS-Protection: MT\r\nConnection: 
  close\r\n\r\n",
    			"sentHeader": "GET \/applications\/18101\/logo HTTP\/1.1\r\nHost: api.toolbox.SA.com\r\nMax-Forwarfr-FR\r\"",
    			" recvPayload ": null,
    			" sentPayload ": null
    		}
    	}
    } 

--

{
    	" timestamp ": 1556787558134,
    	" correlationId ": " 66b1ca5c604be39706af29e7 ",
    	" processInfo ": {
    		" hostname ": " a98sv068 ",
    		" domainId ": " 3c03047f - 74bb - 48a0 - a625 - a7fa4f04ef9e ",
    		" groupId ": " group - 10 ",
    		" groupName ": " ManagerLan ",
    		" serviceId ": " instance - 77 ",
    		" serviceName ": " ManagerLan ",
    		" version ": " v7.5.3 - Internal "
    	},
    	" circuitPath ": [{
    			" policy ": " API Broker ",
    			" execTime ": 19,
    			" filters ": [{
    					" name ": " Set service context ",
    					" type ": " ApiServiceContextFilter ",
    					" class ": " com.vordel.coreapireg.runtime.broker.ApiServiceContextFilter ",
    					" status ": " Pass ",
    					" filterTime ": 1556787558114,
    					" execTime ": 0
    				}, {
    					" name ": " Connect to URL ",
    					" type ": " VApiConnectToURLFilter ",
    					" class ": " com.vordel.circuit.vapi.VApiConnectToURLFilter ",
    					" status ": " Pass ",
    					" filterTime ": 1556787558133,
    					" execTime ": 19
    				}
    			]
    		}
    	]
    } 

--

{
    	" timestamp ": 1556787558134,
    	" correlationId ": " 66b1ca5c604be39706af29e7 ",
    	" processInfo ": {
    		" hostname ": " a98sv068 ",
    		" domainId ": " 3c03047f - 74bb - 48a0 - a625 - a7fa4f04ef9e ",
    		" groupId ": " group - 10 ",
    		" groupName ": " ManagerLan ",
    		" serviceId ": " instance - 77 ",
    		" serviceName ": " ManagerLan ",
    		" version ": " v7.5.3 - Internal "
    	},
    	" transactionSummary ": {
    		" path ": " / SA / toolbox / applications / {
    		id
    		}
    		/logo",
    		"protocol": "https",
    		"protocolSrc": "8065",
    		"status": "success",
    		"serviceContexts": [{
    				"service": "SA_Toolbox_API_UAT",
    				"monitor": true,
    				"client": "Pass Through",
    				"org": null,
    				"app": null,
    				"method": "getClientApplicationLogo",
    				"status": "success",
    				"duration": 20
    			}
    		]
    	}
    }

--

 {
    	"timestamp": 1556787558113,
    	"correlationId": "66b1ca5c604be39706af29e7",
    	"processInfo": {
    		"hostname": "a98sv068",
    		"domainId": "3c03047f-74bb-48a0-a625-a7fa4f04ef9e",
    		"groupId": "group-10",
    		"groupName": "ManagerLan",
    		"serviceId": "instance-77",
    		"serviceName": "ManagerLan",
    		"version": "v7.5.3-Internal"
    	},
    	"transactionElement": {
    		"leg": 0,
    		"duration": 21,
    		"serviceName": "SA_Toolbox_API_UAT",
    		"operation": "getClientApplicationLogo",
    		"finalStatus": "Pass",
    		"protocolInfo": {
    			"http": {
    				"uri": "/SA / toolbox / applications / 18101 / logo ",
    				" status ": 200,
    				" statusText ": " ",
    				" method ": " GET ",
    				" vhost ": null,
    				" wafStatus ": 0,
    				" bytesSent ": 987,
    				" bytesReceived ": 1914,
    				" remoteName ": " 127.0.0.1 ",
    				" remoteAddr ": " 127.0.0.1 ",
    				" localAddr ": " 127.0.0.1 ",
    				" remotePort ": " 60719 ",
    				" localPort ": " 8066 ",
    				" sslSubject ": null,
    				" authSubjectId ": " Pass Through "
    			},
    			" recvHeader ": " GET \  / SA \  / toos: 20 \ r \ nVia: 1.1 a98sv068()AAAvjucYgm\/\n",
    			"sentHeader": "HTTP\/1.=block\r\nContent-Type: image\/png\r\n\r\n",
    			"recvPayload": null,
    			"sentPayload": null
    		}
    	}
       }

Of course this log has been changed :wink:

So my goal is to create a record with

4 big parts

  1. obviously the common part:

    "correlationId": "66b1ca5c604be39706af29e7",
    "processInfo": {
    	"hostname": "a98sv068",
    	"domainId": "3c03047f-74bb-48a0-a625-a7fa4f04ef9e",
    	"groupId": "group-10",
    	"groupName": "ManagerLan",
    	"serviceId": "instance-77",
    	"serviceName": "ManagerLan",
    	"version": "v7.5.3-Internal"
    }
    
  2. the part containing the "transactionElement" information

  3. the part containing the "circuitPath" information

  4. the part containing the "transactionSummary" information

it look like my setting for the moment are not working well for logstash :confused:

Does someone could have an idea on how to start this in a good maner ?

thanks !

What have you tried?

For the moment i'm "experiencing" and here is where I am:

filter {

json {
	source => "message"
	target => "json"
}
date {
	match => ["[json][timeStamp]", "UNIX_MS"]
	target => "@timestamp"
}

mutate {
	add_field => {
		"api_correlationId" => "%{[json][correlationId]}"
		"api_hostname" => "%{[json][processInfo][hostname]}"
		"api_instance" => "%{[json][processInfo][groupName]}"
		"api_service_name" => "%{[json][processInfo][serviceName]}"
	}
}

if ([json][transactionElement]) {
	mutate {
		add_field => {

			"api_name" => "%{[json][transactionElement][serviceName]}"
			"api_duration" => "%{[json][transactionElement][duration]}"
			"api_detail_uri" => "%{[json][transactionElement][protocolInfo][http][uri]}"
			"api_detail_rest_status_code" => "%{[json][transactionElement][protocolInfo][http][status]}"
			"api_detail_method" => "%{[json][transactionElement][protocolInfo][http][method]}"
			"api_detail_received_header" => "%{[json][transactionElement][protocolInfo][recvHeader]}"
			"api_detail_sent_header" => "%{[json][transactionElement][protocolInfo][sentHeader]}"
		}
	}
}
if ([json][transactionSummary]) {
	mutate {
		add_field => {
			"api_endpoint_path" => "%{[json][transactionSummary][path]}"
			"api_status" => "%{[json][transactionSummary][status]}"
		}
	}
}
if ([json][circuitPath]) {
	mutate {
		add_field => {
			"api_policy_name" => "%{[json][circuitPath][policy]}"
			"api_policy_Exec_Time" => "%{[json][circuitPath][execTime]}"
			"api_filter_name" => "%{[json][circuitPath][filters][0][name]}"
			"api_filter_type" => "%{[json][circuitPath][filters][0][type]}"
			"api_filter_status" => "%{[json][circuitPath][filters][0][status]}"
			"api_filter_exec_time" => "%{[json][circuitPath][filters][0][execTime]}"
		}
	}
}

aggregate {
	task_id => "%{api_correlationId}"
	code => "
	map['MapBaseElement']={
	'conca_api_correlationId' => event.get('api_correlationId'),
	'conca_api_hostname' => event.get('api_hostname'),
	'conca_api_instance' => event.get('api_instance'),
	'conca_api_service_name' => event.get('api_service_name')
	}
	map['MapTransactionElement'] = {
	'conca_api_name' => event.get('api_name'),
	'conca_api_duration' => event.get('api_duration'),
	'conca_api_detail_uri' => event.get('api_detail_uri'),
	'conca_api_detail_rest_status_code' => event.get('api_detail_rest_status_code'),
	'conca_api_detail_method' => event.get('api_detail_method'),
	'conca_api_detail_received_header' => event.get('api_detail_received_header'),
	'conca_api_detail_sent_header' => event.get('api_detail_sent_header')
	}

	map['MapCircuitPath'] = {
	'conca_api_policy_name' => event.get('api_policy_name'),
	'conca_api_policy_Exec_Time' => event.get('api_policy_Exec_Time'),
	'conca_api_filter_name' => event.get('api_filter_name'),
	'conca_api_filter_type' => event.get('api_filter_type'),
	'conca_api_filter_status' => event.get('api_filter_status'),
	'conca_api_filter_exec_time' => event.get('api_filter_exec_time')
	}

	map['MapTransactionSummary'] = {
	'conca_api_endpoint_path' => event.get('api_endpoint_path'),
	'conca_api_status' => event.get('api_status')
	}

	##//	map['api_correlationId'] = event.get('api_correlationId')
	##//	map['api_name'] ||= []
	##//	map['api_name'] << {'serviceName' => event.get('api_name')}


	event.cancel()
	"
	push_previous_map_as_event => true
	timeout => 15
}
}
}

Note that circuitPath is an array, so you need to refer to %{[json][circuitPath][0]...}

Your main issue is that you are setting map elements for every event, but they do not exist for every event. So you need to make them conditional. For example

if event.get('api_name')
    map['MapTransactionElement'] = {
        'conca_api_name' => event.get('api_name'), [...]

Hi Badger,

yes I see that I have a problem. Here is what I got in result for information:

{
  "_index": "api-test-logstash-2019.19",
  "_type": "doc",
  "_id": "3VkZjGoBWdaL9cShPRpo",
  "_version": 1,
  "_score": null,
  "_source": {
    "MapBaseElement": {
      "conca_api_correlationId": "09a4ca5c0bfde2b3d13e0b24",
      "conca_api_hostname": "a98sv068api1p",
      "conca_api_instance": "ManagerLan",
      "conca_api_service_name": "ManagerLan"
    },
    "MapTransactionSummary": {
      "conca_api_endpoint_path": "/sg/groupdirectory/v3/users/{sgd}",
      "conca_api_status": "success"
    },
    "MapCircuitPath": {
      "conca_api_filter_exec_time": null,
      "conca_api_policy_Exec_Time": null,
      "conca_api_policy_name": null,
      "conca_api_filter_status": null,
      "conca_api_filter_name": null,
      "conca_api_filter_type": null
    },
    "@version": "1",
    "MapTransactionElement": {
      "conca_api_duration": null,
      "conca_api_detail_received_header": null,
      "conca_api_name": null,
      "conca_api_detail_uri": null,
      "conca_api_detail_method": null,
      "conca_api_detail_sent_header": null,
      "conca_api_detail_rest_status_code": null
    },
    "@timestamp": "2019-05-06T07:42:39.029Z"
  },
  "fields": {
    "@timestamp": [
      "2019-05-06T07:42:39.029Z"
    ]
  },
  "highlight": {
    "MapBaseElement.conca_api_correlationId": [
      "@kibana-highlighted-field@09a4ca5c0bfde2b3d13e0b24@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1557128559029
  ]
}

and

{
  "_index": "api-test-logstash-2019.19",
  "_type": "doc",
  "_id": "yVkZjGoBWdaL9cShPRpo",
  "_version": 1,
  "_score": null,
  "_source": {
    "MapBaseElement": {
      "conca_api_correlationId": "09a4ca5c0bfde2b3d13e0b24",
      "conca_api_hostname": "a98sv068api1p",
      "conca_api_instance": "ManagerLan",
      "conca_api_service_name": "ManagerLan"
    },
    "MapTransactionSummary": {
      "conca_api_endpoint_path": null,
      "conca_api_status": null
    },
    "MapCircuitPath": {
      "conca_api_filter_exec_time": null,
      "conca_api_policy_Exec_Time": null,
      "conca_api_policy_name": null,
      "conca_api_filter_status": null,
      "conca_api_filter_name": null,
      "conca_api_filter_type": null
    },
    "@version": "1",
    "MapTransactionElement": {
      "conca_api_duration": "194",
      "conca_api_detail_received_header": "GET /sg/groupdirectory/v3arded-For: 10.87.\n\r\n",
      "conca_api_name": "SG_GroupDirectory API V3 PROD",
      "conca_api_detail_uri": "/sg/groupdirectory/v3/users/A891",
      "conca_api_detail_method": "GET",
      "conca_api_detail_sent_header": "HTTP/1.1 200 OK\r\nMax-Forwards:=UTF-8\r\n\r\n",
      "conca_api_detail_rest_status_code": "200"
    },
    "@timestamp": "2019-05-06T07:42:38.951Z"
  },
  "fields": {
    "@timestamp": [
      "2019-05-06T07:42:38.951Z"
    ]
  },
  "highlight": {
    "MapBaseElement.conca_api_correlationId": [
      "@kibana-highlighted-field@09a4ca5c0bfde2b3d13e0b24@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1557128558951
  ]
}

As you can see and as we could have imagine before we have multiple entry with empty part for each of them.

My goal would be to get all the information in one log.

Hi again @Badger ,

Ok I think I got it, or at least I'm closer. Here is my filter:

filter {

	mutate {
		remove_field => ["type", "input_type", "host"]
	}
	json {
		source => "message"
		target => "json"
	}
	date {
		match => ["[json][timeStamp]", "UNIX_MS"]
		target => "@timestamp"
	}

	mutate {
		add_field => {
			"api_correlationId" => "%{[json][correlationId]}"
			"api_hostname" => "%{[json][processInfo][hostname]}"
			"api_instance" => "%{[json][processInfo][groupName]}"
			"api_service_name" => "%{[json][processInfo][serviceName]}"
		}
	}

	if ([json][transactionElement]) {
		mutate {
			add_field => {
				"api_name" => "%{[json][transactionElement][serviceName]}"
				"api_duration" => "%{[json][transactionElement][duration]}"
				"api_detail_uri" => "%{[json][transactionElement][protocolInfo][http][uri]}"
				"api_detail_rest_status_code" => "%{[json][transactionElement][protocolInfo][http][status]}"
				"api_detail_method" => "%{[json][transactionElement][protocolInfo][http][method]}"
				"api_detail_received_header" => "%{[json][transactionElement][protocolInfo][recvHeader]}"
				"api_detail_sent_header" => "%{[json][transactionElement][protocolInfo][sentHeader]}"
			}
		}
	}
	if ([json][transactionSummary]) {
		mutate {
			add_field => {
				"api_endpoint_path" => "%{[json][transactionSummary][path]}"
				"api_status" => "%{[json][transactionSummary][status]}"
			}
		}
	}
	if ([json][circuitPath]) {
		mutate {
			add_field => {
				"api_policy_name" => "%{[json][circuitPath][0][policy]}"
				"api_policy_Exec_Time" => "%{[json][circuitPath][0][execTime]}"
				"api_filter_name" => "%{[json][circuitPath][0][filters][0][name]}"
				"api_filter_type" => "%{[json][circuitPath][0][filters][0][type]}"
				"api_filter_status" => "%{[json][circuitPath][0][filters][0][status]}"
				"api_filter_exec_time" => "%{[json][circuitPath][0][filters][0][execTime]}"
			}
		}
	}

	if ([json][transactionElement]) {
		aggregate {
			task_id => "%{api_correlationId}"
			code => "
			map['MapTransactionElement'] = {
			'conca_api_correlationId' => event.get('api_correlationId'),
			'conca_api_hostname' => event.get('api_hostname'),
			'conca_api_instance' => event.get('api_instance'),
			'conca_api_service_name' => event.get('api_service_name'),

			'conca_api_name' => event.get('api_name'),
			'conca_api_duration' => event.get('api_duration'),
			'conca_api_detail_uri' => event.get('api_detail_uri'),
			'conca_api_detail_rest_status_code' => event.get('api_detail_rest_status_code'),
			'conca_api_detail_method' => event.get('api_detail_method'),
			'conca_api_detail_received_header' => event.get('api_detail_received_header'),
			'conca_api_detail_sent_header' => event.get('api_detail_sent_header')
			}"
		}
		drop {}
	} else if ([json][circuitPath]) {
		aggregate {
			task_id => "%{api_correlationId}"
			code => "
			map['MapCircuitPath'] = {
			'conca_api_policy_name' => event.get('api_policy_name'),
			'conca_api_policy_Exec_Time' => event.get('api_policy_Exec_Time'),
			'conca_api_filter_name' => event.get('api_filter_name'),
			'conca_api_filter_type' => event.get('api_filter_type'),
			'conca_api_filter_status' => event.get('api_filter_status'),
			'conca_api_filter_exec_time' => event.get('api_filter_exec_time')
			}"
		}
		drop {}
	} else if ([json][transactionSummary]) {
		aggregate {
			task_id => "%{api_correlationId}"
			code => "
			map['MapTransactionSummary'] = {
			'conca_api_endpoint_path' => event.get('api_endpoint_path'),
			'conca_api_status' => event.get('api_status')
			}"

			push_previous_map_as_event => true
			timeout => 60
		}
	}

	mutate {
		remove_tag => ["beats_input_codec_plain_applied", "beats_input_codec_json_applied"]
		remove_field => ["json", "beat"]
	}

}

But what i can see is that my log are coming in 2 times.
I think these mutations :

mutate {
	add_field => {
		"api_correlationId" => "%{[json][correlationId]}"
		"api_hostname" => "%{[json][processInfo][hostname]}"
		"api_instance" => "%{[json][processInfo][groupName]}"
		"api_service_name" => "%{[json][processInfo][serviceName]}"
	}
}

In the beginning write some log and after my map write a second set of log. How can I not create the first log ? or reference the log value in the map without creating field before hand ?

You can add nested fields directly

f = event.get("[json][processInfo][groupName]")
if f
    map["processInfo-groupName"] = f
end

Well I changed it using something like this:

if ([json][transactionElement]) {
		mutate {
			add_field => {
				tags => "treated"
			}
		}
		aggregate {
			task_id => "%{[json][correlationId]}"
			code => "
			map['MapTransactionElement'] ||= []
			map['MapTransactionElement'] << {
			'conca_api_correlationId' => event.get('[json][correlationId]'),
			'conca_api_hostname' => event.get('[json][processInfo][hostname]'),
			'conca_api_instance' => event.get('[json][processInfo][groupName]'),
			'conca_api_service_name' => event.get('[json][processInfo][serviceName]')
}
		event.cancel()
		"
	}
	drop {}

but even with this in my index in kibana I can see all the logs untreated.
like this log:

{
  "_index": "api-test-logstash-2019.19",
  "_type": "doc",
  "_id": "pXjbjWoBWdaL9cShGlkq",
  "_version": 1,
  "_score": null,
  "_source": {
    "@version": "1",
    "prospector": {
      "type": "log"
    },
    "@timestamp": "2019-05-06T15:54:00.593Z",
    "tags": [
      "treated"
    ],
    "source": "/home/selfdeploy/Manager__LAN/group-9_instance-76_traffic_2019-05-02-21.log",
    "input": {
      "type": "log"
    },
    "log": {
      "file": {
        "path": "/home/selfdeploy/Manager__LAN/group-9_instance-76_traffic_2019-05-02-21.log"
      }
    },
    "offset": 20660340,
    "app": "api",
    "message": "{\"timestamp\":1556784659459,\"correlationId\":\"13a6ca5c3d08274dfab2c0df\",\"processInfo\":{\"hostname\":\"a98sv068api1p\",\"domainId\":\
	"3c03047f-74bb-48a0-a625-a7fa4f04ef9e\",\"groupId\":\"group-9\",\"groupName\":\"FiltrageLan\",\"serviceId\"
	:\"instance-76\",\"serviceName\":\"FiltrageLan\",\"version\":\"v7.5.3-Internal\"},\"transactionSummary\":{\"path\":\"/\", \"protocol\":\"https\", \"protocolSrc\":\"8443\", \"status\":\"success\", \"serviceContexts\":[]}}"
  },
  "fields": {
    "@timestamp": [
      "2019-05-06T15:54:00.593Z"
    ]
  },
  "sort": [
    1557158040593
  ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.