Logstash aggregate plugin with the same key

Assuming I have a file like this:

start k1 v11
other lines
end   k1 i12
other lines
other lines
start k1 v21
anything else
end   k1 i22
anything else
anything else
anything else
start k1 v31
anything else
anything else
end   k1 i32

I'd like to get events like:

k1 v11 i12
k1 v21 i22
k1 v31 i32

i.e. join start / end pairs of lines and extract kA and vBC from start and iDE from end.

With this config:

filter {
  grok {
    match => { 'message' => '^start (?<s>\w+) +(?<v>.*)' }
    add_field => { 'type' => 'v' }
  }

  if (! [type]) {
    grok {
      match => { 'message' => '^end   (?<s>\w+) +(?<i>.*)' }
      add_field => { 'type' => 'i' }
    }
  }

  if (! [type]) {
    drop {}
  }

  if ([type] == 'v') {
    aggregate {
      code => "map['v'] = event.get('v')"
      map_action => 'create'
      task_id => '%{s}'
    }

    drop {}
  }

  if ([type] == 'i') {
    aggregate {
      code => "event.set('v', map['v'])"
      end_of_task => true
      map_action => 'update'
      task_id => '%{s}'
      timeout => 60
    }
  }

}

I'm getting this:

k1 v11 i12
k1 %{v} i22
k1 %{v} i32

I assume because keys are the same, all but the first event is getting its value dropped. Any way to fix?

I wanted to avoid use multiline codec here because the spacing between events can be huge (i.e. the number of other lines and anything else can be huge) and also because there can be things embedded into other lines that make the regexp to filter them out harder to write.

Add '--pipeline.batch.size 1' to the command line (or adjust it in pipelines.yml for the specific pipeline or ...)

What is happening is that the first 125 lines of the file go through the first aggregate filter before any lines hit the second aggregate. When the first type i is processed by the second aggregate it deletes the map entry for that task_id, so the when the next two lines go through the filter has no map for the task_id and the filter is a no-op.

Note that with the java execution engine enabled logstash will re-order lines even with a single worker thread, so a solution like this is going to be fragile.

Ah, that fixed it - thanks Badger!

Are there any performance concerns that I should be aware of with having a batch size of 1 compared to the default 125?

Batching is done for performance reasons, but I do not know how much overhead it adds to use a batch size of 1.

OK thanks Badger!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.