Adding fields to next line using Logstash Aggregate

Hi,

  • I'm trying to implement a use case where I want to add an existing field on a line to the next line.
  • I tried to implement this using aggregate with pipeline.workers set to 1.
  • The sample Log file is as below
Data1 HEADER1 Key1 - Val1
Data2 Key2 - Val2
Data3 Key3 - Val3
Data4 HEADER2 Key4 - Val4
Data5 Key5 - Val5
Data6 Key6 - Val6
  • The expected output is (only showing some lines)
{
        "header" => "HEADER1",
           "key" => "Key1",
           "val" => "Val1",
       "task_id" => "all",
          "data" => "Data1",
    "@timestamp" => 2019-11-20T11:16:54.050Z
}
{
        "header" => "HEADER1",
           "key" => "Key2",
           "val" => "Val2",
       "task_id" => "all",
          "data" => "Data2",
    "@timestamp" => 2019-11-20T11:16:54.051Z
}
  • i.e. the header field from 1st line got added as a new field for the second line and so on.

  • But for some reason all the lines with header field are getting processed before the lines without it, ultimately resulting in a output that looks like this

{
        "header" => "HEADER1",
           "key" => "Key1",
           "val" => "Val1",
       "task_id" => "all",
          "data" => "Data1",
    "@timestamp" => 2019-11-20T11:16:54.050Z
}
{
        "header" => "HEADER2",
           "key" => "Key4",
           "val" => "Val4",
       "task_id" => "all",
          "data" => "Data4",
    "@timestamp" => 2019-11-20T11:16:54.051Z
}
{
        "header" => "HEADER2",
           "key" => "Key2",
           "val" => "Val2",
       "task_id" => "all",
          "data" => "Data2",
    "@timestamp" => 2019-11-20T11:16:54.051Z
}
{
        "header" => "HEADER2",
           "key" => "Key3",
           "val" => "Val3",
       "task_id" => "all",
          "data" => "Data3",
    "@timestamp" => 2019-11-20T11:16:54.051Z
}
{
        "header" => "HEADER2",
           "key" => "Key5",
           "val" => "Val5",
       "task_id" => "all",
          "data" => "Data5",
    "@timestamp" => 2019-11-20T11:16:54.051Z
}
{
        "header" => "HEADER2",
           "key" => "Key6",
           "val" => "Val6",
       "task_id" => "all",
          "data" => "Data6",
    "@timestamp" => 2019-11-20T11:16:54.051Z
}
  • Here is my current Logstash Config
input {
    beats {
        port => 5044
    }
}

filter {

    grok {
        match => {
            "message" => ["%{WORD:data}\s%{WORD:header}\s%{WORD:key} - %{WORD:val}", "%{WORD:data}\s%{WORD:key} - %{WORD:val}"]
        }
    }

    mutate {
        add_field => {"task_id" => "all"}
    }

    if [header] {
        aggregate {
            task_id => "%{task_id}"
            code => "map['header'] = event.get('header')"
        }
    } else {
        aggregate {
            task_id => "%{task_id}"
            code => "event.set('header', map['header'])"
            map_action => "update"
            timeout => 0
        }
    }
}

output {
    stdout {
        codec => rubydebug  
    } 
}
  • Filebeat config
filebeat.inputs:
    - type: log
      paths:
          - "/home/user/downloads/logs/test.log"

output.logstash:
    hosts: "localhost:5044"

Appreciate any help on this.

Have you set pipeline.java_execution: false? If not, events get re-ordered.

Thanks @Badger that resolved it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.