Hi,
I am trying to merge 2 log lines with the aggregate filter and after a week browsing the internet and this forum I still cannot get this working. Below you have 2 loglines with an identifier ExchangeId.
What I want to achieve is one document that has parsed both json lines REQ_OUT in the req-out nested object and the RESP_IN in the resp-in nested object. How it is now is that I get 2 documents and the objects do not exists. The logline is passing the json filter since I do have a meta object.
Can anyone help.
sample input:
{"timestamp":"2021-04-02T05:49:45.251Z","severity":"INFO","Type":"REQ_OUT","Address":"https://a-link.com","ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2",REQ_OUT:"test1"}
{"timestamp":"2021-04-02T05:50:45.534Z","severity":"INFO","Type":"RESP_IN","ResponseCode":"200","Address":"https://a-link.com","ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2",RESP_IN:"test2"}
logstash filter:
filter {
json {
source => "message"
skip_on_invalid_json => "true"
target => "meta"
}
if [meta][ExchangeId] {
mutate {
add_field => { "ExchangeId" => "%{[meta][ExchangeId]}" }
}
if [meta][Type] =~ /REQ_OUT/ {
aggregate {
task_id => "%{ExchangeId}"
#first log line so we save the message to REQ_OUT
code => "
map['REQ_OUT'] ||= event.get('message')
"
map_action => "create"
timeout => 15
push_map_as_event_on_timeout => true
timeout_tags => ["aggregate_timeout"]
}
}
if [meta][Type] =~ /RESP_IN/ {
aggregate {
task_id => "%{ExchangeId}"
#2nd log line so we save the message to RESP_IN
code => "map['RESP_IN'] ||= event.get('message')"
map_action => "update"
end_of_task => true
}
}
}
#decode de json of the messages place in REQ_OUT and RESP_IN by aggregate filter
json {
source => "REQ_OUT"
skip_on_invalid_json => "true"
target => "req-out"
}
json {
source => "RESP_IN"
skip_on_invalid_json => "true"
target => "resp-in"
}
}
Logstash does not give an error
[DEBUG] 2021-04-02 15:40:45.004 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"\n map['REQ_OUT'] ||= event.get('message')\n "}
[DEBUG] 2021-04-02 15:40:45.005 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"map['RESP_IN'] ||= event.get('message')"}
Badger
April 2, 2021, 4:44pm
2
logstash does not log an error because you set skip_on_invalid_json to true. That is invalid JSON, so it does not get parsed. If you had
... "ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2","RESP_IN":"test2"
Then you could use
json { source => "message" target => "meta" }
aggregate {
task_id => "%{[meta][ExchangeId]}"
timeout => 15
push_map_as_event_on_timeout => true
code => '
map["Address"] ||= event.get("[meta][Address]")
map["RESP_IN"] ||= event.get("[meta][RESP_IN]")
map["REQ_OUT"] ||= event.get("[meta][REQ_OUT]")
map["timestamp"] ||= event.get("[meta][timestamp]")
map["ResponseCode"] ||= event.get("[meta][ResponseCode]")
event.cancel # If you do not want the source events
'
timeout_task_id_field => "ExchangeId"
}
and get
{
"Address" => "https://a-link.com",
"RESP_IN" => "test2",
"@timestamp" => 2021-04-02T16:42:55.594Z,
"timestamp" => "2021-04-02T05:49:45.251Z",
"REQ_OUT" => "test1",
"@version" => "1",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"ResponseCode" => "200"
}
Note that when using push_map_as_event_on_timeout the only things in the pushed event are what you added to the map.
Hi,
I'll test this tomorrow. But what do you do when you don't know all the fields in the json and want to put the fields in an object?
eg: I want something like this output:
{
"@timestamp": "2021-04-02T16:42:55.594Z",
"@version": "1",
"ExchangeId": "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"resp-in": {
"Address": "https://a-link.com",
"RESP_IN": "test2",
"timestamp": "2021-04-02T05:49:45.251Z",
"fieldx": "something",
"fieldy": "something"
},
"req-out": {
"Address": "https://a-link.com",
"REQ_OUT": "test1",
"timestamp": "2021-04-02T05:49:45.251Z",
"fieldA": "something",
"fieldB": "something",
"ResponseCode": "200"
}
}
Badger
April 2, 2021, 5:38pm
4
I have not tested it, but something like
code => '
type = event.get("Type")
if type == "REQ_OUT"
dst = "req-out"
elsif type == "RESP_IN"
dst = "resp-in"
end
if dst
event.to_hash.each { |k, v|
map[dst] = {}
unless [ "host", "path", "ExchangeId"].include? k
map[dst][k] = v
end
}
end
'
Hi,
I do get a Aggregate successful filter code execution. But I just get 2 documents only with the meta object. Trying to debug
filter {
json { source => "message" target => "meta" }
aggregate {
task_id => "%{[meta][ExchangeId]}"
timeout => 15
push_map_as_event_on_timeout => true
code => '
type = event.get("Type")
if type == "REQ_OUT"
dst = "req-out"
elsif type == "RESP_IN"
dst = "resp-in"
end
if dst
event.to_hash.each { |k, v|
map[dst] = {}
unless [ "host", "path", "ExchangeId"].include? k
map[dst][k] = v
end
}
end
'
timeout_task_id_field => "ExchangeId"
timeout_tags => ['_aggregatetimeout']
}
}
Did some more debugging and for some reason the aggregate filter is not working it is not matching on the "ExchangeId" and just produces 2 documents.
I've tried to simplify the code but with no success. The most annoying thing of all is that when I start logstash in debug mode it does give a
DEBUG] 2021-04-14 06:57:14.170 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"\n type = event.get(\"Type\")\n map[type] = {}\n event.to_hash.each { |k, v|\n map[type][k] = v\n } \n "}
event. So it does do something. Is this a bug?
aggregate.conf
input {
file {
path => "/tmp/test.txt"
}
}
filter {
json { source => "message" target => "meta" }
# To mae sure its not a nested filed issue copy the ExchangeId and Type to an upper level field
mutate {
copy => { "[meta][ExchangeId]" => "ExchangeId" }
This file has been truncated. show original
output
{
"Type" => "REQ_OUT",
"@version" => "1",
"host" => "DPA-ELK-LGS02",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"@timestamp" => 2021-04-14T06:46:29.221Z,
"path" => "/tmp/test.txt"
}
{
"Type" => "RESP_IN",
This file has been truncated. show original
test.txt
{"timestamp":"2021-04-02T05:50:44.251Z", "severity":"INFO","Type":"REQ_OUT","Address":"https://a-link.com", "ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2","REQ_OUT":"test1"}
{"timestamp":"2021-04-02T05:50:45.534Z","severity":"INFO","Type":"RESP_IN","ResponseCode":"200","Address":"https://a-link.com","ExchangeId":"260e06a3-9cb5-4154-bf97-637e929fa4c2","RESP_IN":"test2"}
Badger
April 14, 2021, 1:36pm
7
If test.txt only has two lines why would expect more than two events?
Hi,
maybe I am on the completely wrong track but I would expect 1 document: (when event cancel is active I get nothing)
{
"Type" => "REQ_OUT",
"@version" => "1",
"host" => "LGS02",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"@timestamp" => 2021-04-14T06:46:29.221Z,
"path" => "/tmp/test.txt"
"REQ_OUT" => {
"timestamp" => "2021-04-02T05:50:45.534Z",
"severity" => "INFO",
...
}
"RESP_IN" => {
"timestamp" => "2021-04-02T05:50:44.251Z",
"severity" => "INFO",
ResponseCode" => "200",
...
}
"@version" => "1",
"host" => "LGS02",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"@timestamp" => 2021-04-14T06:46:29.247Z,
"path" => "/tmp/test.txt"
}
Badger
April 14, 2021, 3:00pm
9
Using
json { source => "message" target => "[@metadata][data]" remove_field => [ "message" ] }
aggregate {
task_id => "%{[@metadata][data][ExchangeId]}"
timeout => 5
push_map_as_event_on_timeout => true
code => '
# we want 2 nested objects in the document object REQ_OUT and RESP_IN
type = event.get("[@metadata][data][Type]")
map[type] = {}
event.get("[@metadata][data]").each { |k, v|
unless [ "ExchangeId"].include? k
map[type][k] = v
end
}
event.to_hash.each { |k, v|
unless [ "@version" ].include? k
map[type][k] = v
end
}
event.cancel
'
timeout_task_id_field => "ExchangeId"
timeout_tags => ['_aggregatetimeout']
}
I get
{
"RESP_IN" => {
"RESP_IN" => "test2",
"severity" => "INFO",
"path" => "/home/user/foo.txt",
"Type" => "RESP_IN",
"ResponseCode" => "200",
"@timestamp" => 2021-04-14T14:57:51.115Z,
"Address" => "https://a-link.com",
"host" => "...",
"timestamp" => "2021-04-02T05:50:45.534Z"
},
"REQ_OUT" => {
"severity" => "INFO",
"path" => "/home/user/foo.txt",
"Type" => "REQ_OUT",
"REQ_OUT" => "test1",
"@timestamp" => 2021-04-14T14:57:51.115Z,
"Address" => "https://a-link.com",
"host" => "...",
"timestamp" => "2021-04-02T05:50:44.251Z"
},
"@timestamp" => 2021-04-14T14:58:01.101Z,
"@version" => "1",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"tags" => [
[0] "_aggregatetimeout"
]
}
Which version of logstash are you running? I wonder if you are hitting the issue mentioned in this thread.
Hi,
I'm running 7.8.0 so I guess I hit that bug. I'll try to set the pipeline.java_execution or just upgrade to the latest version.
Pretty sure this is the sollution. You are a god among men!
system
(system)
Closed
May 12, 2021, 3:15pm
11
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.