Do global variables override the result of matching?

Hi

maybe the subject is not clear. I will try to explain it.
I have a log file that looks like this

06/22/18 00:41:11.719 PID = 1036
...
06/22/18 00:41:11.757 host is "host1"
...
06/22/18 00:41:11.772 JOB ID: 87.113
...
06/22/18 00:41:12.813 user user1 
...
06/22/18 01:52:52.075 status=0 
...
06/22/18 01:53:23.099 PID = 17089
...
06/22/18 01:53:23.129 host is "host2"
...
06/22/18 01:53:23.145 JOB ID: 2539.0
...
06/22/18 01:53:23.234 user user2 
...
06/22/18 02:51:08.274 status=0 
...

The ... means other lines I am not interested in.
I want to print out every time I know the timestamp for a new event (matches line with PID), the host, the jobid and the user. And also I want to print out all of that again, plus the timestamp for end event (matches line with status) and the status itself. The rest of events can be dropped.

With a pattern file like this

START_TIMESTAMP %{DATE_US} %{TIME}
END_TIMESTAMP %{START_TIMESTAMP:endtime} (status=)%{NUMBER:status}

this code is working fine

filter {

    grok {
        add_tag => [ "matched", "starting" ]
        match => { "message" => "%{START_TIMESTAMP:starttime} (?=PID =)" }
        patterns_dir => ["/tmp/pat/"]
        break_on_match => false 
    }   
    grok {
        add_tag => [ "matched", "submithost" ]
        match => { "message" => "(?<=host is \")%{HOSTNAME:submithost}" }
        patterns_dir => ["/tmp/pat/"]
        break_on_match => false 
    }   
    grok {
        add_tag => [ "matched", "user" ]
        match => { "message" => "(?<=user )%{WORD:user}" } 
        patterns_dir => ["/tmp/pat/"]
        break_on_match => false 
    }   
    grok {
        add_tag => [ "matched", "jobid" ]
        match => { "message" => "(?<=JOB ID: )%{NUMBER:jobid}" }
        patterns_dir => ["/tmp/pat/"]
        break_on_match => false 
    }   
    grok {
        add_tag => [ "matched", "finished" ]
        match => { "message" => "%{END_TIMESTAMP}"}
        patterns_dir => ["/tmp/pat/"]
        break_on_match => false 
    }   


    if "matched" not in [tags] {
        drop { } 
    }   

    if "starting" in [tags] {
            ruby { code => '@@starttime = event.get("starttime")' }
    } else {
            ruby { code => 'event.set("starttime", @@starttime)' }
            if "submithost" in [tags] {
                    ruby { code => '@@submithost = event.get("submithost")' }
            } else {
                    ruby { code => 'event.set("submithost", @@submithost)' }
		    if "jobid" in [tags] {
                            ruby { code => '@@jobid = event.get("jobid")' }
                    } else {
                            ruby { code => 'event.set("jobid", @@jobid)' }
                            if "user" in [tags] {
                                    ruby { code => '@@user = event.get("user")' }
			    } else {
                                    ruby { code => 'event.set("user", @@user)' }
                            }
                    }
            }
    }


    if "finished" not in [tags] {
        if "user" not in [tags] {
            drop { }
        }
    }

}

I get this output, which is what I expected.

{
          "user" => "user1",
    "submithost" => "host1",
     "starttime" => "06/22/18 00:41:11.719",
         "jobid" => "87.113"
}
{
       "endtime" => "06/22/18 01:52:52.075",
        "status" => "0",
          "user" => "user1",
    "submithost" => "host1",
     "starttime" => "06/22/18 00:41:11.719",
         "jobid" => "87.113"
}
{
          "user" => "user2",
    "submithost" => "host2",
     "starttime" => "06/22/18 01:53:23.099",
         "jobid" => "2539.0"
}
{
       "endtime" => "06/22/18 02:51:08.274",
        "status" => "0",
          "user" => "user2",
    "submithost" => "host2",
     "starttime" => "06/22/18 01:53:23.099",
         "jobid" => "2539.0"
}

However, if I try to re-write the logic in a simpler way, to avoid that log if-else structure, like this, it doesn't work anymore, but I don't see why.

[...]

if "starting" in [tags] {
        ruby { code => '@@starttime = event.get("starttime")' }
}
if "submithost" in [tags] {
        ruby { code => '@@submithost = event.get("submithost")' }
}
if "jobid" in [tags] {
        ruby { code => '@@jobid = event.get("jobid")' }
}
if "user" in [tags] {
        ruby { code => '@@user = event.get("user")' }
}


if "user" in [tags] {
        ruby { code => '
                event.set("starttime", @@starttime)
                event.set("submithost", @@submithost)
                event.set("jobid", @@jobid)
                event.set("user", @@user)
                '
        }
}
if "finished" in [tags] {
        ruby { code => '
                event.set("starttime", @@starttime)
                event.set("submithost", @@submithost)
                event.set("jobid", @@jobid)
                event.set("user", @@user)
                '
        }
}

[...]

Now all json docs in the output are almost equal to each other. It seems like I have created some global variables that override the fields for each line.

Maybe is not possible to do it this way, and only way is with the if-else-if-else... mechanism?

Any comment is more than welcome.
Thanks a lot in advance.

You do not need to use tags. You can check for field existence to see if the patterns matched

    grok {
        pattern_definitions => { "START_TIMESTAMP" => "%{DATE_US} %{TIME}" }
        match => { "message" => [
            '^%{START_TIMESTAMP:starttime} PID = ',
            '^%{START_TIMESTAMP} host is "%{HOSTNAME:submithost}"',
            '^%{START_TIMESTAMP} JOB ID: %{NUMBER:jobid}',
            '^%{START_TIMESTAMP} user %{WORD:user}',
            '^%{START_TIMESTAMP:endtime} status=%{NUMBER:status}'
        ] }
    }

    if [jobid]           { ruby { code => '@@jobid      = event.get("jobid");      event.cancel' } }
    else if [submithost] { ruby { code => '@@submithost = event.get("submithost"); event.cancel' } }
    else if [starttime]  { ruby { code => '@@starttime  = event.get("starttime");  event.cancel' } }
    else if [user]       { ruby { code => '@@user       = event.get("user"); event.set("startime", @@starttime)' } }
    else if [status]     { ruby { code => 'event.set("startime", @@starttime); event.set("jobid", @@jobid); event.set("submithost", @@submithost); event.set("user", @@user)' } }
    else { drop {} }

Ruby's instance- and class-variables are not thread-safe storage mechanisms and will produce surprising results when Logstash processes multiple log messages simultaneously. In general, they should not be used.


But beyond that, the format of log messages you're looking to consume is also not concurrency-safe; what happens if you have two processes outputting logs at the same time?

Below, we have an example of two processes interleaving; when looking at any individual log message, it is impossible to associate it with a specific begin event.

06/22/18 00:41:11.719 PID = 1036
06/22/18 00:41:11.757 host is "host1"
06/22/18 00:41:11.772 JOB ID: 87.113
06/22/18 00:41:12.813 user user1
06/22/18 00:52:11.758 PID = 1234
06/22/18 00:52:11.771 host is "host1"
06/22/18 00:52:11.772 JOB ID: 87.114
06/22/18 00:53:12.828 user user1
06/22/18 01:52:52.075 status=0
06/22/18 02:12:57.027 status=0

From the messages you have output, it looks like host and pid could be used to uniquely identify a job (although PIDs are reused, the odds of hitting the same host/pid combination representing a different job is low enough to ignore for now).

The following is the type of log that could be useful; it contains the timestamp, the values for the PID and host, and other data.

2018-06-22T00:41:11.719 PID=1036 host=host1 job_id=87.113
2018-06-22T00:48:57.918 PID=1451 host=host1 job_id=87.114
2018-06-23T01:53:23.099 PID=1036 host=host1 status=0
2018-06-22T01:57:02.317 PID=1872 host=host1 job_id=87.115
2018-06-23T01:59:17.618 PID=1036 host=host1 status=1
2018-06-23T02:08:47.825 PID=1872 host=host1 status=0
2018-06-22T03:37:22.186 PID=2981 host=host1 job_id=87.116
2018-06-23T03:38:01.971 PID=2981 host=host1 status=1

Using the Aggregate filter plugin (with a single pipeline worker: -w 1 on command line or pipeline.workers: 1 in the yaml), we can group these start/end events:

filter {
  dissect {
    mapping => {
      "message" => "%{timestamp} %{[@metadata][kv]}"
    }
  }
  kv {
    source => "[@metadata][kv]"
    add_field => {
      "[@metadata][task_id]" => "%{host}|%{PID}"
    }
  }
  date {
    match => ["timestamp", "ISO8601"]
  }
  if [status] {
    aggregate {
      task_id => "%{[@metadata][task_id]}"
      end_of_task => true
      code => '
        timestamp = event.get("timestamp")
        event.set("end_timestamp", timestamp)
        map.each { |k,v| event.set(k, v) }
      '
    }
  } else {
    aggregate {
      task_id => "%{[@metadata][task_id]}"
      code => '
        earliest = [map["@timestamp"], event.get("@timestamp")].compact.min
        map.update(event.to_hash)
        map["@timetamp"] = earliest
      '
    }
    drop { }
  }
}

Which yields the following, appropriately associating each status-bearing message with the event(s) that preceded it:

{
             "host" => "host1",
          "message" => "2018-06-22T00:41:11.719 PID=1036 host=host1 job_id=87.113",
        "@timetamp" => 2018-06-22T00:41:11.719Z,
              "PID" => "1036",
           "status" => "0",
           "job_id" => "87.113",
        "timestamp" => "2018-06-22T00:41:11.719",
         "@version" => "1",
       "@timestamp" => 2018-06-22T00:41:11.719Z,
    "end_timestamp" => "2018-06-23T01:53:23.099"
}
{
             "host" => "host1",
          "message" => "2018-06-23T01:59:17.618 PID=1036 host=host1 status=1",
              "PID" => "1036",
           "status" => "1",
        "timestamp" => "2018-06-23T01:59:17.618",
         "@version" => "1",
       "@timestamp" => 2018-06-23T01:59:17.618Z,
    "end_timestamp" => "2018-06-23T01:59:17.618"
}
{
             "host" => "host1",
          "message" => "2018-06-22T01:57:02.317 PID=1872 host=host1 job_id=87.115",
        "@timetamp" => 2018-06-22T01:57:02.317Z,
              "PID" => "1872",
           "status" => "0",
           "job_id" => "87.115",
        "timestamp" => "2018-06-22T01:57:02.317",
         "@version" => "1",
       "@timestamp" => 2018-06-22T01:57:02.317Z,
    "end_timestamp" => "2018-06-23T02:08:47.825"
}
{
             "host" => "host1",
          "message" => "2018-06-22T03:37:22.186 PID=2981 host=host1 job_id=87.116",
        "@timetamp" => 2018-06-22T03:37:22.186Z,
              "PID" => "2981",
           "status" => "1",
           "job_id" => "87.116",
        "timestamp" => "2018-06-22T03:37:22.186",
         "@version" => "1",
       "@timestamp" => 2018-06-22T03:37:22.186Z,
    "end_timestamp" => "2018-06-23T03:38:01.971"
}

Thanks, I am going to try it out.

Hi, you said they are not "thread-safe". I was running the pipeline with -w 1 option. Does your comment still apply?

Note that this all assumes you have

pipeline.java_execution: false

in /etc/logstash/logstash.yml in addition to -w 1. The java execution engine results in the lines being re-ordered before they are fed to the single worker thread. I get the impression that that is not regarded as a bug.

I was running from command line, without --java-execution. So that shouldn't be an issue.

Just for completeness, I have been able to re-write the logic entirely in ruby, so no need to mix logstash code + ruby code:

ruby { code => '

    if event.get("starttime")
        @@starttime = event.get("starttime")   
        event.cancel
    elsif event.get("submithost")
        @@submithost = event.get("submithost")          
        event.cancel
    elsif event.get("jobid")
        @@jobid = event.get("jobid")
        event.cancel
    elsif event.get("user")
        @@user = event.get("user")
        event.set("starttime", @@starttime)
        event.set("jobid", @@jobid)
        event.set("submithost", @@submithost)
        event.set("user", @@user)
    elsif event.get("status")
        event.set("starttime", @@starttime)
        event.set("jobid", @@jobid)
        event.set("submithost", @@submithost)
        event.set("user", @@user)
    end

'}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.