Add a field to logs between 2 specific logs

Hi,
What I am trying to do is, when I encounter a specific log (with a message including "blockOpened"), I save somewhere the fact that I encountered it, and then for all the following logs I add to them a field or tag. And when I encounter another specific log (with a message including "blockClosed") I stop that.
What I have done so far, is hold the fact that I encountered the "blockOpened" and "blockClosed" logs in a class variable called @@blocklog. And to use this variable I use the ruby code to store it in a blocklog field. However it does not seem to be working properly, sometimes the blocklog field has the right value, sometimes not, maybe I am doing something wrong.
Here is my current code :

if [log][message] =~ "block" {
    ruby {
        init => '@@blocklog="null"'
    }
    if [log][message] =~ "blockOpened" {
        ruby {
            code => '@@blocklog="opened"'
        }
    } else if [log][message] =~ "blockClosed" {
        ruby {
            code => '@@blocklog="closed"'
        }
    }
} else {
    ruby {
        init => '@@blocklog="null"'
        code => 'event.set("[blocklog]", @@blocklog)'
    }

    if [blocklog] == "opened" {
        mutate {
            add_tag => [ "inside a block" ]
        }
    }
}

Sometimes the tag inside a block is appearing on logs that are not inside "blockOpened" and "blockClosed" logs. If you have an idea of what's wrong that would be great. Thanks!

Take a look at the pipeline.workers and pipeline.ordered options. logstash does not preserve the order of events by default.

So I tried setting the pipeline workers to 1 and pipeline ordered to auto (which is the default value) but I have the same result as before..

Another issue you will face is that events are processed in batches of 125 events. That means (approximately) that 125 events go through the ruby filters that manipulate @@blocklog and then later those 125 events go through

if [blocklog] == "opened" {
    mutate {
        add_tag => [ "inside a block" ]
    }
}

You may find you get better results if you set pipeline.batch.size to 1.

It may make more sense to move all the pattern matching and if-then logic into a single ruby filter. As a bonus you can then change from a class variable to an instance variable.

Thanks! So I set the pipeline.batch.size to 1 but logs are processed much more slower which is an issue. I'll try moving all the logic inside the ruby filter. And if I do so will I be able to put back the pipeline.batch.size to its previous size (125 it seems) or do I have to set it to 1?

If you do everything in a single ruby filter then you can increase the pipeline.batch.size back to the default or to any other value.

Ok so I put nearly everything in the ruby filter :

ruby {
      init => '@blocklog="null"'
      code => 'if event.get("[log][message]").include? "blockOpened"
                @blocklog="opened"
              elsif event.get("[log][message]").include? "blockClosed"
                @blocklog="closed"
              else
                event.set("[blocklog]", @blocklog)
              end'
    }

    if [blocklog] == "opened" {
      mutate {
        add_tag => [ "inside a block"]
      }
    }

And I reset the pipeline.batch.size to its origin and now I have the same problem as before.. Did I do something wrong?

Provided pipeline.ordered is maintaining order I would expect that code to only add the tag for lines between the "blockOpened" and "blockClosed" lines.

If you want to test whether events are being processed in order you could use

ruby { init => '@foo = 0' code => '@foo += 1; event.set("foo", @foo)' }

and see if

output { stdout { codec => rubydebug } }

shows events in the right order.

Well it seems like they are showing in the right order, I don't understand.. Could it be that

if [blocklog] == "opened" {
      mutate {
        add_tag => [ "inside a block"]
      }
    }

is called in a second time, and the [blocklog] had time to change value?

I cannot think of any way that could happen.

Fundamentally you have no guarantee events will be processed in order. Pipeline.ordered is only functions if workers is set to 1.

You could try storing the file offset or time stamp and only respect the block setting if it occurs later in time or the log, but that only deals with half your error cases.

Yeah, but what's weird is that the pipeline.workers was set to 1. Anyway, even if would have worked with 1 worker I don't think that would have been a good solution because I would still have lost performance, which is not desirable.
I think I will have to solve this issue before sending logs to logstash.
Thanks for your help and time!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.