Hi,
- I'm trying to implement a use case where I want to add an existing field on a line to the next line.
- I tried to implement this using
aggregate
withpipeline.workers
set to 1. - The sample Log file is as below
Data1 HEADER1 Key1 - Val1
Data2 Key2 - Val2
Data3 Key3 - Val3
Data4 HEADER2 Key4 - Val4
Data5 Key5 - Val5
Data6 Key6 - Val6
- The expected output is (only showing some lines)
{
"header" => "HEADER1",
"key" => "Key1",
"val" => "Val1",
"task_id" => "all",
"data" => "Data1",
"@timestamp" => 2019-11-20T11:16:54.050Z
}
{
"header" => "HEADER1",
"key" => "Key2",
"val" => "Val2",
"task_id" => "all",
"data" => "Data2",
"@timestamp" => 2019-11-20T11:16:54.051Z
}
-
i.e. the
header
field from 1st line got added as a new field for the second line and so on. -
But for some reason all the lines with
header
field are getting processed before the lines without it, ultimately resulting in a output that looks like this
{
"header" => "HEADER1",
"key" => "Key1",
"val" => "Val1",
"task_id" => "all",
"data" => "Data1",
"@timestamp" => 2019-11-20T11:16:54.050Z
}
{
"header" => "HEADER2",
"key" => "Key4",
"val" => "Val4",
"task_id" => "all",
"data" => "Data4",
"@timestamp" => 2019-11-20T11:16:54.051Z
}
{
"header" => "HEADER2",
"key" => "Key2",
"val" => "Val2",
"task_id" => "all",
"data" => "Data2",
"@timestamp" => 2019-11-20T11:16:54.051Z
}
{
"header" => "HEADER2",
"key" => "Key3",
"val" => "Val3",
"task_id" => "all",
"data" => "Data3",
"@timestamp" => 2019-11-20T11:16:54.051Z
}
{
"header" => "HEADER2",
"key" => "Key5",
"val" => "Val5",
"task_id" => "all",
"data" => "Data5",
"@timestamp" => 2019-11-20T11:16:54.051Z
}
{
"header" => "HEADER2",
"key" => "Key6",
"val" => "Val6",
"task_id" => "all",
"data" => "Data6",
"@timestamp" => 2019-11-20T11:16:54.051Z
}
- Here is my current Logstash Config
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => ["%{WORD:data}\s%{WORD:header}\s%{WORD:key} - %{WORD:val}", "%{WORD:data}\s%{WORD:key} - %{WORD:val}"]
}
}
mutate {
add_field => {"task_id" => "all"}
}
if [header] {
aggregate {
task_id => "%{task_id}"
code => "map['header'] = event.get('header')"
}
} else {
aggregate {
task_id => "%{task_id}"
code => "event.set('header', map['header'])"
map_action => "update"
timeout => 0
}
}
}
output {
stdout {
codec => rubydebug
}
}
- Filebeat config
filebeat.inputs:
- type: log
paths:
- "/home/user/downloads/logs/test.log"
output.logstash:
hosts: "localhost:5044"
Appreciate any help on this.