Hi, everyone!
The problem is that sometimes we get logs with same task_id (instock.rf.screenId in our case) and message fields. It look like this:
"_source": {
"@timestamp": "2024-05-28T05:43:34.769Z",
"input": {
"type": "log"
},
"ecs": {
"version": "8.0.0"
},
"agent": {
"ephemeral_id": "15caa6aa-1f2e-4d08-8943-1ef84696a2bb",
"id": "cf41eacd-75d1-41cc-b087-547f594ace02",
"name": "MASH-WMS7-01",
"type": "filebeat",
"version": "8.12.1"
},
"thread_name": "RFWS: 1196",
"level_value": 20000,
"service": {
"environment": "AA_PROD_MASH_v7"
},
"pod": {
"name": "wms-prod-mash.rf[0]"
},
"instock": {
"rf": {
"process": "com.instock.server.rf.ship.LPCombine",
"answerBack": "1196",
"screenName": "ship.Shipping.SOHStatus",
"elapsedTime": 39,
"device": 2268,
"screenId": "1c5e269d:18fbd5f579e:1f5",
"login": "17575",
"isRequest": false,
"fork": "FORK137"
},
"serviceset": "rf"
},
"message": "<-title=Container - 17575, =Order:, order=0177050355, =ISW:, wave=6634, status=Left warehouse, =% Dated:, releasedPercent=100, =% Gathered:, pickedPercent=76.953, =% Checked:, =0, =% Ready:, readyPercent=0, =% Loaded:, loadedPercent=0, =Cell:, remainQty=0, = /, remainLoc=0, F2=Save, F5=Print, ",
"web": {
"trace": {
"id": "83dd6d05-0091-4f19-bbb3-a503c4d51b16"
}
},
"fields": {
"type": "rf"
}
We can receive 3-5 same messages and they all will be aggregated to 1 long message.
How can I select only first message and ignore all others?
My logstash config is:
input {
beats {
port => "5044"
codec => json
}
}
filter {
mutate {
add_tag => ["%{instock.rf.isRequest}"]
}
elapsed {
start_tag => "false"
end_tag => "true"
unique_id_field => "instock.rf.screenId"
timeout => 600
}
aggregate {
task_id => "%{instock.rf.screenId}"
code => "
map['general_message'] = []
map['message_reply'] = []
map['message_request'] = []
"
map_action => "create"
timeout => 600
timeout_tags => "_aggregatetimeout"
push_previous_map_as_event => false
inactivity_timeout => 600
}
if ![instock.rf.isRequest] {
aggregate {
task_id => "%{instock.rf.screenId}"
code => "
map['general_message'] << event.get('message')
map['message_request'] << event.get('message')
event.cancel()
"
map_action => "update"
}
}
if [instock.rf.isRequest] {
aggregate {
task_id => "%{instock.rf.screenId}"
code => "
map['general_message'] << event.get('message')
map['message_reply'] << event.get('message')
event.set('general_message', map['general_message'])
event.set('message_request', map['message_request'])
event.set('message_reply', map['message_reply'])
"
map_action => "update"
push_previous_map_as_event => false
end_of_task => true
}
mutate {
remove_field => [ "message" ]
add_tag => ["aa_test_mash_v7"]
}
}
}
output {
if "aa_prod_mash_v7" in [tags] {
elasticsearch {
hosts => ["dcr-elastic1.int.company.com:9090"]
index => "wms-debug-%{+YYYY.MM.dd}-000001"
}
} else {
elasticsearch {
hosts => ["dcr-elastic1.int.company.com:9090"]
index => "wms-debug-test%{+YYYY.MM.dd}-000001"
}
}
}