Store a field as a global var

Hello everyone,

I am a new user of the ELK stack and I am facing a problem that I can not solve.
i'm using filebeat to collect datas from several logs files and send them to logstash for parsing.
Is it possible for logstash to store the value of a field, in the first log file, as a "global" var, in order to add it as a new field in the other logs files ?
Thx a lot for any help or idea to explore.

Regards,
KSA

There is no real way to do this, by design.

As a pipeline transformation tool, the only way Logstash can achieve the level of performance it does is to eschew shared state (especially externally-mutable shared state) as much as possible.

What is the intended goal? Is the source information a natural part of your logs, or could a Logstash filter acquire the information in some other way?

Hi Ry,

Thank you for your answer.
We have a filebeat service who loops on several logs files then sends to logstash through a filter with mutate and grok treatment.
The intented goal is to add a field that is collected in the first parsed log file, and to add it to the others.

For example, the first log file is filtered like this :

if [logtype] == "first_log_file" {
    csv {
      columns => [
        "datetime",
        "column1",
        "column2",
        "column3"
      ]
      separator => ";"
      skip_empty_columns => true
    }
    date {
      match => ["datetime", "yyyy-MM-dd HH:mm:ss.SSSSSS"]
      remove_field => ["datetime"]
    }
    
  }

The second log file is parsed like :

if [logtype] == "second_log_file" {
    csv {
      columns => [
        "datetime",
        "column1"
      ]
      separator => ";"
      skip_empty_columns => true
    }
    date {
      match => ["datetime", "yyyy-MM-dd HH:mm:ss.SSSSSS"]
      remove_field => ["datetime"]
    }
    
  }

Is there a way to add the value of the column3 (in the first file) in a new field in the second file, something like :

    if [logtype] == "second_log_file" {
      csv {
        columns => [
          "datetime",
          "column1"
        ]
        separator => ";"
        skip_empty_columns => true
      }
      date {
        match => ["datetime", "yyyy-MM-dd HH:mm:ss.SSSSSS"]
        remove_field => ["datetime"]
      }
      
      mutate {
        add_field => {
          "new_field" => "default"
        }
        
        copy => { 
          "column3_first_file" => "new_field" 
       }
    }
 } 

Regards,
KSA

The biggest trouble here is that we're working with tools that do not guarantee strict ordering (again, by design), so it is very difficult to define or guarantee "first":

  • Filebeat prospects multiple files concurrently, and may interleave events from one document with events from a different document when sending to Logstash; you can make your own guarantees that Filebeat will never see multiple files at once by strictly controlling the directory from which it is prospecting, but there are dangers here that would be very hard to track down.
  • Filebeat sends messages to Logstash using the Lumberjack protocol over HTTP; it can be configured to send the messages over a single connection, which will constrain order at the cost of throughput.
  • Filebeat also emits a constant stream to Logstash; when it encounters a new file, events extracted from that file are emitted as part of the normal stream -- there is no way to differentiate what the new "first" is from the next file.
  • Logstash by default spins up multiple workers threads to spread load across CPUs and allow work to be done while other work is waiting on IO; it also can be constrained to process one thing at a time, but performance is significantly impacted.

All of that considered, the only thing I can think of is to send entire documents to Logstash using Filebeat's multiline configuration (example of how to do this in a different Discuss Topic), and then using Logstash to first extract metadata from the first line and then split into multiple events from there.

The following pseudo-code hasn't been tested, but may be helpful to convey the process I envision:

input {
  filebeat {
    # ...
  }
}
filter {
  # first, break the giant event into "first" line and "rest" to contain the rest of the lines:
  grok {
    pattern_definitions => {
        "NOTNEWLINE" => "[^\n]*"
        "ULTRAGREEDYDATA" => "(.|\r|\n)*"
      }
    match {
      "message" => "\A%{NOTNEWLINE:[@metadata][first]}\n%{ULTRAGREEDYDATA:[@metadata][rest]}\Z"
    }
    "remove_field" => "message"
  }
  
  # then process the first line, extracting bits to `[@metadata][shared]`
  csv {
    source => "[@metadata][first]"
    columns => [
      "[@metadata][shared][datetime]",
      "[@metadata][shared][column1]",
      "[@metadata][shared][column2]",
      "[@metadata][shared][column3]"
    ]
    separator => ";"
    skip_empty_columns => true
  }

  # now that we are done extracting the first row, split the event into one event per line in the "rest";
  # each resulting event will have everything we already put into `[@metadata][shared]`.
  split {
    "field" => "[@metadata][rest]"
  }

  # now we're processing individual rows
  csv {
    source => "[@metadata][rest]"
    columns => [
      "datetime",
      "column1"
    ]
    separator => ";"
    skip_empty_columns => true
  }
  date {
    match => ["datetime", "yyyy-MM-dd HH:mm:ss.SSSSSS"]
    remove_field => ["datetime"]
  }

  # from here it's up to you; you can copy fields from the shared metadata onto the event:
  mutate {
    "copy" => {
      "[@metadata][shared][column3]" => "column3"
    }
  }
}
output {
  # ...
}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.