Logstash Filter with wrong result for deal with lots event at the same time

Hi,

I use Winlogbeat to transmit Windows Event Log. My Logstash Filter use for parsing the "message" in the Json.
The message content will like this, "RuleName: technique_id=T1047,technique_name=Windows Management Instrumentation." And, my purpose is splitting the message into three part, "RuleName, Technique_id and Technique_name." Here is my code. I write the filter in Ruby.

filter {
  ruby {
    code => '
      # Initialize
      rulename_input = " "
      technique_id = " "
      technique_name = " "

      array = event.get("[message]").split("\n")
      Message = array[1]
      RuleName = Message.split(": ")

      if RuleName[1] == " " or RuleName[1] == nil
        rulename_input = " "
        technique_id = " "
        technique_name = " "
      else 
        rulename_input = RuleName[1]
        tech = RuleName[1].split(",")

        tec = tech[0].split("=")
        technique_id = tec[1]

        tec = tech[1].split("=")
        technique_name = tec[1]
      end

      event.set("RuleName",rulename_input)
      event.set("Technique_id",technique_id)
      event.set("Technique_name",technique_name)
    '
  }
}

The filter result is usually correct in most situation. Below is correct picture by Kibana.

But, if there are lots of event log send from Winlogbeat at the same time, it will be wrong. The wrong result as below:


How do I fix it?

In ruby, any "variable" that starts with a capital letter is a constant. I am not going to explain the scope of constants in ruby, but suffice to say that if you configure

input { generator { count => 1 lines => [ 'first', 'second' ] } }
filter {
    ruby {
        code => '
            if event.get("message") =="first"
                Constant = "Foo"
            end
            event.set("someField", Constant)
        '
    }
}
output { stdout { codec => rubydebug { metadata => false } } }

then you get

{
   "message" => "first",
  "@version" => "1",
"@timestamp" => 2020-05-05T15:44:14.523Z,
  "sequence" => 0,
 "someField" => "Foo"
}
{
   "message" => "second",
  "@version" => "1",
"@timestamp" => 2020-05-05T15:44:14.536Z,
  "sequence" => 0,
 "someField" => "Foo"
}

whereas if you configure

input { generator { count => 1 lines => [ 'first', 'second' ] } }
filter {
    ruby {
        code => '
            if event.get("message") =="first"
                constant = "Foo"
            end
            event.set("someField", constant)
        '
    }
}
output { stdout { codec => rubydebug { metadata => false } } }

then you get

{
   "message" => "first",
  "@version" => "1",
"@timestamp" => 2020-05-05T15:43:59.981Z,
  "sequence" => 0,
 "someField" => "Foo"
}
{
   "message" => "second",
  "@version" => "1",
"@timestamp" => 2020-05-05T15:44:00.114Z,
  "sequence" => 0,
 "someField" => nil
}

so rename "Message" and "RuleName" as "message" and "ruleName".

1 Like

Thank you for your help , and now it works correctly! Thank you so much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.