Make aggregate filter to take only unique (first) message

Hi, everyone!
The problem is that sometimes we get logs with same task_id (instock.rf.screenId in our case) and message fields. It look like this:

"_source": {
"@timestamp": "2024-05-28T05:43:34.769Z",
"input": {
"type": "log"
},
"ecs": {
"version": "8.0.0"
},
"agent": {
"ephemeral_id": "15caa6aa-1f2e-4d08-8943-1ef84696a2bb",
"id": "cf41eacd-75d1-41cc-b087-547f594ace02",
"name": "MASH-WMS7-01",
"type": "filebeat",
"version": "8.12.1"
},
"thread_name": "RFWS: 1196",
"level_value": 20000,
"service": {
"environment": "AA_PROD_MASH_v7"
},
"pod": {
"name": "wms-prod-mash.rf[0]"
},
"instock": {
"rf": {
"process": "com.instock.server.rf.ship.LPCombine",
"answerBack": "1196",
"screenName": "ship.Shipping.SOHStatus",
"elapsedTime": 39,
"device": 2268,
"screenId": "1c5e269d:18fbd5f579e:1f5",
"login": "17575",
"isRequest": false,
"fork": "FORK137"
},
"serviceset": "rf"
},
"message": "<-title=Container - 17575, =Order:, order=0177050355, =ISW:, wave=6634, status=Left warehouse, =% Dated:, releasedPercent=100, =% Gathered:, pickedPercent=76.953, =% Checked:, =0, =% Ready:, readyPercent=0, =% Loaded:, loadedPercent=0, =Cell:, remainQty=0, = /, remainLoc=0, F2=Save, F5=Print, ",
"web": {
"trace": {
"id": "83dd6d05-0091-4f19-bbb3-a503c4d51b16"
}
},
"fields": {
"type": "rf"
}

We can receive 3-5 same messages and they all will be aggregated to 1 long message.
How can I select only first message and ignore all others?
My logstash config is:

input {
  beats {
    port => "5044"
	codec => json
  }
}
filter {
		mutate {
				add_tag => ["%{instock.rf.isRequest}"]
		}	
		elapsed {
			start_tag => "false"
			end_tag => "true"
			unique_id_field => "instock.rf.screenId"
			timeout => 600
			}
		aggregate {
		task_id => "%{instock.rf.screenId}"
		code => "
		map['general_message'] = []
		map['message_reply'] = []
		map['message_request'] = []
		"
		map_action => "create"
		timeout => 600
		timeout_tags => "_aggregatetimeout"
		push_previous_map_as_event => false
		inactivity_timeout => 600
		}
		
		if ![instock.rf.isRequest] {
			aggregate {
			task_id => "%{instock.rf.screenId}"
			code => "
			map['general_message'] << event.get('message')
			map['message_request'] << event.get('message')
			event.cancel()
			"
			map_action => "update"
			}

		}
	
		if [instock.rf.isRequest] {	
			aggregate {
			task_id => "%{instock.rf.screenId}"
			code => "
			map['general_message'] << event.get('message')
			map['message_reply'] << event.get('message')
			event.set('general_message', map['general_message'])
			event.set('message_request', map['message_request'])
			event.set('message_reply', map['message_reply'])
			"
			map_action => "update"
			push_previous_map_as_event => false
			end_of_task => true
			}
			mutate {
				remove_field => [ "message" ]
				add_tag => ["aa_test_mash_v7"]
				}
		}
		
	}

output {
  if "aa_prod_mash_v7" in [tags] {
  elasticsearch {
    hosts => ["dcr-elastic1.int.company.com:9090"]
	index => "wms-debug-%{+YYYY.MM.dd}-000001"
  }
  } else {
  elasticsearch {
    hosts => ["dcr-elastic1.int.company.com:9090"]
	index => "wms-debug-test%{+YYYY.MM.dd}-000001"
  }  
  }
}

I met some threads on this forum about using fingerprint, but I really can not understand, how can I apply it here. I added some strokes

		fingerprint {
			"method" => "SHA1"
			"key" => "Phrase"
			"source" => ['instock.rf.screenId']
		}

in Logstash config and now I can see it in ELK, but it doesn't help me at all, my general_message is still getting lots unnecessary repeating data.
In normal case it should take aggregate message from if ![instock.rf.isRequest] and only 1 message from if [instock.rf.isRequest]

Maybe I can use some sort of comparing similar strings and if it already has text from second message, it should be dropped or something? Comparing is simple, but to deploy it in aggregation filter?

have you looked at collapse API

Collapse search results | Elasticsearch Guide [8.14] | Elastic.

The thing is that I don't think it can help me, because I get incorrect final message on Logstash level. Let me try to explain and show you. As you can see from my first post I showed only 1 log entry, my bad. If you look in aggregation config I'm making general_message field by adding data from message fields from couple log strings (+ I rename each message field into message_request and message_reply and also add them to my final result). In ELK at the end I can see log entry, that looks like this:

    "general_message": [
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "->sku=, ___action___=F6, "
    ],
    "message_reply": [
      "->sku=, ___action___=F6, "
    ],
    "message_request": [
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, ",
      "<-title=Department - 12932, msg=Enter number:, pickedMsg=Selected, =/, =Cell:, location=PT-06-20-05-03, =Controller:, lp=P0001582111, =#:, sku=, F8=List, F6=Take container, F7=Take cargo from container , F10=Enter quantity, " "
    ],

Just for you to understand the whole situation - in ideal world I should get only 2 log strings from source file with message_request and message_reply cases (I'm taking data with filbeat from log files, that are generated by warehouse software), but in some cases it generates 5 log strings fully similar in message_request case (even timestamp) and developers of this software answered me, that it is "by desing" and they won't try to fix it.

How can I compare message field with general_message? I can try to do smith like this? but I'm not sure with syntax and the place in Logstash config, where I placed it:

		if [instock.rf.isRequest] {
			if [message] in [general_message] {
				drop {}
			} else {			
				aggregate {
				task_id => "%{instock.rf.screenId}"
				code => "
				map['general_message'] << event.get('message')
				map['message_reply'] << event.get('message')
				event.set('general_message', map['general_message'])
				event.set('message_request', map['message_request'])
				event.set('message_reply', map['message_reply'])
				event.set('fingerprint', map['fingerprint'])
				#event.cancel()
				"
				map_action => "update"
				#timeout => 600
				#timeout_tags => "_aggregatetimeout"
				push_previous_map_as_event => false
				add_tag => ["%{web.trace.id}"]
				end_of_task => true
				}
			}
			mutate {
				remove_field => [ "message" ]
				add_tag => ["aa_test_mash_v7"]
				#add_tag => ["end"]
				}
		}

It's a pity that no one helped and no good solution for this task was found.
My variant is not good, but working for me. I used grok to parse final general_message. I had to name it dr_general_message on aggregate phase and even temp_general_message for grok parsing (don't know why, but grok refused to work with dr_general_message) with deleting temp field at the end. Hope, that it will help someone with similar problem...
Full logstash config:

input {
  beats {
    port => "5044"
	codec => json
  }
}
filter {

		elapsed {
			start_tag => "false"
			end_tag => "true"
			unique_id_field => "instock.rf.screenId"
			timeout => 600
			}
			
		aggregate {
		task_id => "%{instock.rf.screenId}"
		code => "
		map['id'] = event.get('fingerprint')
		map['dr_general_message'] = []
		map['dr_message_reply'] = []
		map['dr_message_request'] = []
		"
		map_action => "create"
		timeout => 600
		timeout_tags => "_aggregatetimeout"
		push_previous_map_as_event => false
		inactivity_timeout => 600
		}
		
		if ![instock.rf.isRequest] {
		
			aggregate {
			task_id => "%{instock.rf.screenId}"
			code => "
			map['dr_general_message'] << event.get('message')
			map['dr_message_request'] << event.get('message')
			event.cancel()
			"
			map_action => "update"
			add_tag => [ "begin_ag" ]
			}
		}
	
		if [instock.rf.isRequest] {

			aggregate {
			task_id => "%{instock.rf.screenId}"
			code => "
			map['dr_general_message'] << event.get('message')
			map['dr_message_reply'] << event.get('message')
			event.set('dr_general_message', map['dr_general_message'])
			event.set('dr_message_request', map['dr_message_request'])
			event.set('dr_message_reply', map['dr_message_reply'])
			"
			map_action => "update"
			push_previous_map_as_event => false
			add_tag => ["%{web.trace.id}"]
			end_of_task => true
			}
			
			mutate {
				add_field => { "temp_general_message" => "%{dr_general_message}" }				
				remove_field => [ "message" ]
				add_tag => ["aa_test_mash_v7"]
				}
				
			grok {
				match => {
					"temp_general_message" => [
					"%{DATA:message_request}, ,%{GREEDYDATA:to_delete}\->%{GREEDYDATA:message_reply}",
					"%{DATA:message_reply},%{GREEDYDATA:to_delete}"
					]
				}
			}
			
			mutate {
				gsub => ["message_reply", "^", "->"]
				add_field => { "general_message" => "%{message_request}, , %{message_reply}" }	
				remove_field => [ "message" ]
				remove_field => [ "temp_general_message" ]
				}
		# if [instock.rf.isRequest] end
		}
	#filter end	
	}

output {
  if "aa_prod_mash_v7" in [tags] {
  jdbc{
    driver_jar_path => "C:\instock\sqljdbc_12.6\rus\jars\mssql-jdbc-12.6.1.jre8.jar"
    connection_string => "jdbc:sqlserver://mash-sql04:1433;databaseName=WMSlogs_DB;encrypt=false;user= any_user;password=some_password"
    statement => [ "INSERT into dialog (timestamp,hostname,general_message,instockrfanswerBack,instockrfdevice,instockrfelapsedTime,instockrffork,instockrfisRequest,instockrflogin,instockrfprocess,instockrfscreenName,message_nonreq,message_req,podname,serviceenvironment,thread_name,webtraceid,elapsed_time) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", "@timestamp","[host][name]","general_message","instock.rf.answerBack","instock.rf.device","instock.rf.elapsedTime","instock.rf.fork","instock.rf.isRequest","instock.rf.login","instock.rf.process","instock.rf.screenName","message_nonreq","message_req","pod.name","service.environment","thread_name","web.trace.id","elapsed_time" ]
  }
  elasticsearch {
    hosts => ["dcr-elastic1.company.com:9090"]
	index => "wms-debug-%{+YYYY.MM.dd}-000001"
  }
  } else {
  jdbc{
    driver_jar_path => "C:\instock\sqljdbc_12.6\rus\jars\mssql-jdbc-12.6.1.jre8.jar"
    connection_string => "jdbc:sqlserver://mash-sql01:1433;databaseName=iswms_logs;encrypt=false;user=any_user;password= some_password"
	statement => [ "INSERT into dialog (timestamp,hostname,general_message,instockrfanswerBack,instockrfdevice,instockrfelapsedTime,instockrffork,instockrfisRequest,instockrflogin,instockrfprocess,instockrfscreenName,message_request,message_reply,podname,serviceenvironment,thread_name,instockrfscreenId,elapsed_time) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", "@timestamp","[host][name]","general_message","instock.rf.answerBack","instock.rf.device","instock.rf.elapsedTime","instock.rf.fork","instock.rf.isRequest","instock.rf.login","instock.rf.process","instock.rf.screenName","message_request","message_reply","pod.name","service.environment","thread_name","instock.rf.screenId","elapsed_time" ]
  }
  elasticsearch {
    hosts => ["dcr-elastic1.company.com:9090"]
	index => "wms-debug-test%{+YYYY.MM.dd}-000001"
  }  
  }
}