Store a field as a global var

The biggest trouble here is that we're working with tools that do not guarantee strict ordering (again, by design), so it is very difficult to define or guarantee "first":

  • Filebeat prospects multiple files concurrently, and may interleave events from one document with events from a different document when sending to Logstash; you can make your own guarantees that Filebeat will never see multiple files at once by strictly controlling the directory from which it is prospecting, but there are dangers here that would be very hard to track down.
  • Filebeat sends messages to Logstash using the Lumberjack protocol over HTTP; it can be configured to send the messages over a single connection, which will constrain order at the cost of throughput.
  • Filebeat also emits a constant stream to Logstash; when it encounters a new file, events extracted from that file are emitted as part of the normal stream -- there is no way to differentiate what the new "first" is from the next file.
  • Logstash by default spins up multiple workers threads to spread load across CPUs and allow work to be done while other work is waiting on IO; it also can be constrained to process one thing at a time, but performance is significantly impacted.

All of that considered, the only thing I can think of is to send entire documents to Logstash using Filebeat's multiline configuration (example of how to do this in a different Discuss Topic), and then using Logstash to first extract metadata from the first line and then split into multiple events from there.

The following pseudo-code hasn't been tested, but may be helpful to convey the process I envision:

input {
  filebeat {
    # ...
  }
}
filter {
  # first, break the giant event into "first" line and "rest" to contain the rest of the lines:
  grok {
    pattern_definitions => {
        "NOTNEWLINE" => "[^\n]*"
        "ULTRAGREEDYDATA" => "(.|\r|\n)*"
      }
    match {
      "message" => "\A%{NOTNEWLINE:[@metadata][first]}\n%{ULTRAGREEDYDATA:[@metadata][rest]}\Z"
    }
    "remove_field" => "message"
  }
  
  # then process the first line, extracting bits to `[@metadata][shared]`
  csv {
    source => "[@metadata][first]"
    columns => [
      "[@metadata][shared][datetime]",
      "[@metadata][shared][column1]",
      "[@metadata][shared][column2]",
      "[@metadata][shared][column3]"
    ]
    separator => ";"
    skip_empty_columns => true
  }

  # now that we are done extracting the first row, split the event into one event per line in the "rest";
  # each resulting event will have everything we already put into `[@metadata][shared]`.
  split {
    "field" => "[@metadata][rest]"
  }

  # now we're processing individual rows
  csv {
    source => "[@metadata][rest]"
    columns => [
      "datetime",
      "column1"
    ]
    separator => ";"
    skip_empty_columns => true
  }
  date {
    match => ["datetime", "yyyy-MM-dd HH:mm:ss.SSSSSS"]
    remove_field => ["datetime"]
  }

  # from here it's up to you; you can copy fields from the shared metadata onto the event:
  mutate {
    "copy" => {
      "[@metadata][shared][column3]" => "column3"
    }
  }
}
output {
  # ...
}
1 Like