Logstash Aggregate plugin is passing by some events

Good day everyone!
I need some help, because I can't understand, why aggregate function is not working properly. The thing is, that some events are not just aggregated, though IDs are the same (in my case ID is web.trace.id field). Events are stored by filebeat on windows server. All configs and example strokes you can see lower.
Logs examples:

{"@timestamp":"2024-04-26T15:51:55.805+03:00","@version":"1","message":"->qty0=3, ","log.logger":"RF","thread_name":"RFWS: 4360","log.level":"INFO","level_value":20000,"web.trace.id":"a8b3e2bb-e9db-4d1b-971f-ac2766e57c39","instock.rf.device":1361,"instock.rf.screenName":"pick.Picking.getQty","instock.rf.login":"21187","instock.rf.isRequest":true,"instock.rf.answerBack":"4360","instock.rf.fork":"FORK403","instock.rf.process":"com.instock.server.rf.ship.LPCombine","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-26T15:51:55.856+03:00","@version":"1","message":"<-title=Cont.g - 21187, msg=no entry","log.logger":"RF","thread_name":"RFWS: 4360","log.level":"INFO","level_value":20000,"web.trace.id":"a8b3e2bb-e9db-4d1b-971f-ac2766e57c39","instock.rf.elapsedTime":40,"instock.rf.device":1361,"instock.rf.screenName":"pick.Picking.getQty","instock.rf.login":"21187","instock.rf.isRequest":false,"instock.rf.answerBack":"4360","instock.rf.fork":"FORK403","instock.rf.process":"com.instock.server.rf.ship.LPCombine","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-26T10:07:17.508+03:00","@version":"1","message":"->list=, action=F1, ","log.logger":"RF","thread_name":"RFWS: 4373","log.level":"INFO","level_value":20000,"web.trace.id":"f1dd7b80-cbf0-4201-9d95-24a0d34679f9","instock.rf.device":1374,"instock.rf.screenName":"generic.Generic.selectFromList","instock.rf.login":"14113","instock.rf.isRequest":true,"instock.rf.answerBack":"4373","instock.rf.fork":"FORK387","instock.rf.process":"com.instock.server.rf.task.GeneralWork","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-26T10:07:17.591+03:00","@version":"1","message":"<-title=dispLoc=U-11-06-01-03, inputLoc=, F10=input case","log.logger":"RF","thread_name":"RFWS: 4373","log.level":"INFO","level_value":20000,"web.trace.id":"f1dd7b80-cbf0-4201-9d95-24a0d34679f9","instock.rf.elapsedTime":79,"instock.rf.device":1374,"instock.rf.screenName":"pick.Picking.enterSourceLocation","instock.rf.login":"14113","instock.rf.isRequest":false,"instock.rf.answerBack":"4373","instock.rf.fork":"FORK387","instock.rf.process":"com.instock.server.rf.task.GeneralWork","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-26T10:07:17.399+03:00","@version":"1","message":"->inputSKU=VR6150930, ","log.logger":"RF","thread_name":"RFWS: 4417","log.level":"INFO","level_value":20000,"web.trace.id":"f05489ae-ca83-4804-8b15-98e9c70eccf7","instock.rf.device":1418,"instock.rf.screenName":"pick.Picking.enterSKU","instock.rf.login":"18755","instock.rf.isRequest":true,"instock.rf.answerBack":"4417","instock.rf.fork":"FORK274","instock.rf.process":"com.instock.server.rf.task.GeneralWork","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-26T10:07:17.607+03:00","@version":"1","message":"<-title=barcodeQty=1, msg=Scan:, barcode=, F3=Ready, ","log.logger":"RF","thread_name":"RFWS: 4417","log.level":"INFO","level_value":20000,"web.trace.id":"f05489ae-ca83-4804-8b15-98e9c70eccf7","instock.rf.elapsedTime":303,"instock.rf.device":1418,"instock.rf.screenName":"pick.Picking.enterTempBarcode","instock.rf.login":"18755","instock.rf.isRequest":false,"instock.rf.answerBack":"4417","instock.rf.fork":"FORK274","instock.rf.process":"com.instock.server.rf.task.GeneralWork","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-25T18:27:43.319+03:00","@version":"1","message":"->receiptLoc=VK-01-2-34, ","log.logger":"RF","thread_name":"RFWS: 1067","log.level":"INFO","level_value":20000,"web.trace.id":"354d8219-ce49-4214-ae2e-e912b74bb820","instock.rf.device":2139,"instock.rf.screenName":"receiving.Receiving.getReceiptLoc","instock.rf.login":"21148","instock.rf.isRequest":true,"instock.rf.answerBack":"1067","instock.rf.fork":"FORK306","instock.rf.process":"com.instock.server.rf.ship.LPSortPickedByControlZone","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}
{"@timestamp":"2024-04-25T18:27:43.474+03:00","@version":"1","message":"<-title=sort - 21148, msg=enter box, prompt=control:, input=, ","log.logger":"RF","thread_name":"RFWS: 1067","log.level":"INFO","level_value":20000,"web.trace.id":"354d8219-ce49-4214-ae2e-e912b74bb820","instock.rf.elapsedTime":151,"instock.rf.device":2139,"instock.rf.screenName":"inventory.Inventory.getLP","instock.rf.login":"21148","instock.rf.isRequest":false,"instock.rf.answerBack":"1067","instock.rf.fork":"FORK306","instock.rf.process":"com.instock.server.rf.ship.LPSortPickedByControlZone","service.environment":"AA_PROD_MASH_v7","instock.serviceset":"rf","pod.name":"wms-prod-mash.rf[0]"}

Logstash config:

input {
  beats {
    port => "5044"
	codec => json
  }
}
filter {
	aggregate {
    task_id => "%{web.trace.id}"
    code => "
	map['general_message'] = []
	map['message_req'] = []
	map['message_nonreq'] = []
	"
	map_action => "create"
	timeout => 60
	timeout_tags => "_aggregatetimeout"
	push_previous_map_as_event => false
    inactivity_timeout => 5
	}
	if [instock.rf.isRequest] {
		aggregate {
		task_id => "%{web.trace.id}"
		code => "
		map['general_message'] << event.get('message')
		map['message_req'] << event.get('message')
		event.cancel()
		"
		map_action => "update"
		}
	}
	if ![instock.rf.isRequest] {
		aggregate {
		task_id => "%{web.trace.id}"
		code => "
		map['general_message'] << event.get('message')
		map['message_nonreq'] << event.get('message')
		event.set('general_message', map['general_message'])
		event.set('message_nonreq', map['message_nonreq'])
		event.set('message_req', map['message_req'])
		"
		map_action => "update"
		end_of_task => true
		}
	}
}
output {
  elasticsearch {
    hosts => ["dcr-elastic1:9090"]
	index => "wms-debug-%{+YYYY.MM.dd}-000001"
  }
}

Are you setting pipeline.workers 1 and pipeline.ordered true?

1 Like

Thank you for the fast answer, Badger!
I didn't use these settings... Followed by your link, but didn't find any explanation... Can you, please, explain shortly, how can these settings help me and where should I use it? Filebeat/Logstash/smth else?

The aggregate filters require the logstash pipeline to be executed using only one worker, which means that it will not process things in parallel.

So you need to use pipeline.workers: 1 as explained in this documentation.

You can set this on logstash.yml, which will apply to all pipelines that logstash is running, or if you are using pipelines.yml, you can set it to just the pipeline where you have the aggregate filter.

Keep in mind that this can impact the performance.

1 Like

Badger and Leandrojmp, thank you very much!!!
Everything is working now as expected!
Uncomment two strokes in logstash.yml:

 pipeline.id: main
 pipeline.workers: 1