First attempt into the world of aggregate - string should be a number?

So with the help of Badger, I got a working grok filter.
Now I need to change it so that it aggregates the lines into one event.

One event looks like this

  2019-01-12 16:25:01.889 0EF4 [BERLIN_PSL -] Starting to send messages on output channel 1001, queue 7
  2019-01-12 16:25:01.889 0EF4 [BERLIN_PSL -] Call DialogServer.SendMessages
  2019-01-12 16:25:02.221 0EF4 [BERLIN_PSL -] End DialogServer.SendMessages last 0 seconds
  2019-01-12 16:25:02.222 0EF4 [BERLIN_PSL -] Sent 1 messages in transaction 1 on output channel 1001, queue 7
  2019-01-12 16:25:02.222 0EF4 [BERLIN_PSL -] Call DialogServer.SendMessages
  2019-01-12 16:25:02.237 0EF4 [BERLIN_PSL -] End DialogServer.SendMessages last 0 seconds
  2019-01-12 16:25:02.238 0EF4 [BERLIN_PSL -] Completed sending messages on output channel 1001, queue 7

So my filter looks like this:

grok {
  match => {
     "message" => [ "^  %{NOTSPACE:date}%{SPACE}%{NOTSPACE:clock} %{WORD:taskid} \[%{WORD:instance} -] %{WORD:status} %{GREEDYDATA:restOfLine}"]
  }
}
grok {
  match => {
   "restOfLine" => [
     "to send %{GREEDYDATA}",
     "%{INT:batchsent} messages %{GREEDYDATA}",
     "DialogServer.SendMessages last %{INT:batchduration} seconds$",
     "sending %{GREEDYDATA}"
     ]
   }
}

This takes the four lines that I'm interested in, and places values in fields
Then I've added these lines to get the lines aggregated:

Startup, setting the two counters to 0:

 if [status] == "Starting" {
  aggregate {
    task_id => "%{taskid}"
    code => "map['sent'] = 0 ; map['duration'] = 0"
    map_action => "create"
  }
}

This accumulates the duration (in theory)

if [status] == "End" {
  aggregate {
    task_id => "%{taskid}"
    code => "map['duration'] += event.get('batchduration')"
    map_action => "update"
  }
}

This accumulates the sent lines (in theory)

if [status] == "Sent" {
 aggregate {
    task_id => "%{taskid}"
    code => "map['sent'] += event.get('batchsent')"
    map_action => "update"
  }
}

And this should then generate an event with the above accumulated fields.

if [status] == "Completed" {
  aggregate {
    task_id => "%{taskid}"
    code => "event.set('duration', map['duration']) ; event.set('sent', map['sent'])"
    end_of_task => true
    timeout => 1000
  }
}

But if look at the logstash debug I get a few errors about the variable type:

[2019-01-15T13:00:47,802][ERROR][logstash.filters.aggregate] Aggregate exception occurred {:error=>#<TypeError: String can't be coerced into Fixnum>, :code=>"map['duration'] += event.get('batchduration')", :map=>{"sent"=>0, "duration"=>0}, :event_data=>{"date"=>"2019-01-12", "restOfLine"=>"DialogServer.SendMessages last 0 seconds"...

[2019-01-15T13:00:47,805][ERROR][logstash.filters.aggregate] Aggregate exception occurred {:error=>#<TypeError: String can't be coerced into Fixnum>, :code=>"map['sent'] += event.get('batchsent')", :map=>{"sent"=>0, "duration"=>0}, :event_data=>{"date"=>"2019-01-12", "restOfLine"=>"1 messages in transaction 1 on output channel 1001, queue 7", ...

[2019-01-15T13:00:47,809][ERROR][logstash.filters.aggregate] Aggregate exception occurred {:error=>#<TypeError: String can't be coerced into Fixnum>, :code=>"map['duration'] += event.get('batchduration')", :map=>{"sent"=>0, "duration"=>0}, :event_data=>{"date"=>"2019-01-12", "restOfLine"=>"DialogServer.SendMessages last 0 seconds"...

Now, how do I make it so that it understands that it is numbers, and not a string?

It's telling you that it cannot convert the strings into numbers. So you have two choices. You can parse them as numbers in the first place, doing the conversion in the grok patterns using a :int suffix, such as %{INT:batchsent:int}. Or you can explicitly convert them in ruby

code => "map['sent'] += event.get('batchsent').to_i"
1 Like

Hi Badger

Ok, so I added it to the grok filter like:

     "%{INT:batchsent:long} messages %{GREEDYDATA}",
     "DialogServer.SendMessages last %{INT:batchduration:long} seconds$",

I changed it to long, as I'm not sure of the batch size it allows
But I still get type errors, is that because of the = 0 is not a number? And how do I make that so it knows it's a number?

[2019-01-15T14:31:47,291][ERROR][logstash.filters.aggregate] Aggregate exception occurred {:error=>#<TypeError: String can't be coerced into Fixnum>, :code=>"map['duration'] += event.get('batchduration')", :map=>{"sent"=>0, "duration"=>0}, :event_data=>{"date"=>"2019-01-12", "restOfLine"=>"DialogServer.SendMessages last 0 seconds",`

[2019-01-15T14:31:47,302][ERROR][logstash.filters.aggregate] Aggregate exception occurred {:error=>#<TypeError: String can't be coerced into Fixnum>, :code=>"map['sent'] += event.get('batchsent')", :map=>{"sent"=>0, "duration"=>0}, :event_data=>{"date"=>"2019-01-12", "restOfLine"=>"1 messages in transaction 1 on output channel 1001, queue 7", ...

[2019-01-15T14:31:47,311][ERROR][logstash.filters.aggregate] Aggregate exception occurred {:error=>#<TypeError: String can't be coerced into Fixnum>, :code=>"map['duration'] += event.get('batchduration')", :map=>{"sent"=>0, "duration"=>0}, :event_data=>{"date"=>"2019-01-12", "restOfLine"=>"DialogServer.SendMessages last 0 seconds", ...

According to the documentation 'Currently the only supported conversions are int and float'

1 Like

Darned, I'm hitting all the bumps, aren't I.
Ok, so after changing it to 'int', it does convert it, at least I don't see any errors in the logfile.

But I'm unsure if it works, because I don't see the usual 'event' dump in the debug output in the logs.
I have it set to
stdout { codec => rubydebug }

Usually I see a formatted output with the fields and the values dumped in the log, I don't see that here?
Just for the fun of it, I also tried sending it log elasticsearch, to see if I could create a new index, but the index isn't created?

And I do see the 'Completed line in the logfile:

[2019-01-15T14:45:32,049][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2019-01-15T14:45:30.269Z, "Instance name"=>"BERLIN_PSL", "source"=>"C:\\test\\MHSendMessagesService-MHSendMessagesThread-Events-6.log", "offset"=>1148, "taskid"=>"0EF4", "duration"=>0, "sent"=>1, "input"=>{"type"=>"log"}, "message"=>" 2019-01-12 16:25:02.238 0EF4 [BERLIN_PSL -] Completed sending messages on output channel 1001, queue 7", "beat"=>{"name"=>"BERLIN40001", "hostname"=>"BERLIN40001", "version"=>"6.5.1"}, "host"=>{"name"=>"BERLIN40001"}, "@version"=>"1", "tags"=>["SENDING", "beats_input_codec_plain_applied"], "fields"=>{"logtype"=>"portraitsending"}, "prospector"=>{"type"=>"log"}, "status"=>"Completed"}}

I'm not sure what it is supposed to look like in the logfile, but I see a lot of:

[2019-01-15T14:40:10,582][DEBUG][logstash.filters.aggregate] Aggregate flush call with {:final=>false}

The aggregate worked. When I ran it I did get the usual rubydebug output, and it included those fields on the final message.

1 Like

Hi Badger

Ok, that IS weird yesterday I didn't see it, today I do :slight_smile:
One thing that I baffles me though is that the I get multiple entries sent to ES, even though I only have one event (consisting of multiple lines), I thought the aggregate would only send the 'resulting' entry out of the logstash???

Oh, and it seems that I cannot rely on the order of the lines:

mutate {
  add_field => {
    "datetime" => "%{date} %{clock}"
  }
  remove_field => [ "date", "clock", "restOfLine" ]
  rename => { "instance" => "Instance name" }
}

So now the timestamp conversion doesn't work. If I do not remove the 'date' and 'clock' it works.
I've added a seperate mutate -> remove_field below the timestamp matching, and then it seems to work.

Now I've been asked to include one more line, sigh, to the entry, and I'm not sure how to do that.
In the top of the logfile, above all the entries, are these lines:

Events of MHSendMessagesService

                               * STOP *

                          0810 *** START ***
                                 * Module: C:\Program Files (x86)\PST\Million Handshakes\Dialog Server\MHSendMessagesService.exe
                                 * Version: 6.2.0.1280
                                 * Computer: BERLIN40001
                                 * UserName: PDsvc-BERLIN40001
                                 * Address IP: 10.100.1.142
                                 * Process ID: 2388 ($954)

And they would like the Process ID to be stored together with all the events from the logfile.
Is that even possible, to have the field repeated on all the events?

So two challenges:
Why do I get events for all the lines in the logfile, and not just for the resulting event?
How do I add the Process ID that is unique for the logfile itself, but is only shown at the top of the logfile?

If you don't want the lines that match Starting or End or Sent then drop {} them after aggregating the fields of interest.

You have "--pipeline.workers 1" on the command line, right?

1 Like

Stash the process id in a class variable in a ruby filter.

if [message] =~ "Process ID:" {
    grok { match => { "message" => [ "^%{SPACE}\* Process ID: %{NUMBER:pid}" ] } }
    ruby { code => '@@pid = event.get("pid")' }
}

Then later you can add it to the event using another ruby filter.

ruby { code => 'event.set("ProcessID", @@pid)' }

If you want to add it to the event in the aggregate filter then I think it would need to be a global variable ($pid), since @@pid will only be visible in ruby filters.

1 Like

Oh, it's that simple with the drop, ok, I get it now, the way I understood the doc page, it was dropped automagically.
I have pipeline.workers set to 1, yes. It's set in the containers environment variables.

That is very cool that I can stash it in a variable.
I'm going to add it after the first grok filter parsing, like this I think:

if "_grokparsefailure" in [tags] {
  if [message] =~ "Process ID:" {
    grok { match => { "message" => [ "^%SPACE}\* Process ID: %NUMBER.pid}" ] } }
    ruby { code => '$pid = event.get("pid")' }
  }
  drop { }
}

I'm a bit unsure about how to understand the stashing and retreiving.
So I tried doing the $pid above, and then modify the 'Completed' aggregation to have a code like this:

if [status] == "Completed" {
  aggregate {
    task_id => "%{taskid}"
    code => "event.set('duration', map['duration']) ; event.set('sent', map['sent']) ; event.set('ProcessID', $pid)"

But that didn't work. The ProcessID was nil, so how do I get that done correctly?

I also tried placing the line you wrote directly in the filter (outside the aggregate filter), but that also gives me a nil as value.

I'm very impressed with your help, you are definetely my hero in this endeavour :slight_smile: Thankyou!

The global variable works. It's the grok that is not working. It should be

"^%{SPACE}\* Process ID: %{NUMBER:pid}"
1 Like

Dooh, you are simply a star :muscle:, darned, it is very very cool.
Now the boss had some input, he is very impressed with the result! What a filter :exploding_head:
He is a bit worried that the TaskID of 4 hex codes could be the same on different servers, so I was thinking to just do a mutate at the top, and use that value in the aggregate filter:

mutate {
  add_field => {
    "identity" => "%{instance}%{TaskID}"
  }
}

And that seems to work :slight_smile: Weeheee

So now it's getting real life data, interesting to see if it works as it's supposed to :slight_smile:

Again, a thousand thanks for your invaluable help!

I can see that some of the logfiles do not have a 'Process ID' in them, and that gives me a ruby exception:

logstash | [2019-01-17T07:52:34,964][ERROR][logstash.filters.ruby ] Ruby exception occurred: uninitialized class variable @@pid in LogStash::Filters::Ruby

I've tried
ruby { code => "if (defined?(@@pid)).nil? then event.set('ProcessID', '') else event.set('ProcessID', @@pid) end" }

And I think the code itself works, but apparently, if the @@pid has been set in one logfile, it will be not be 'reset' when then next logfile starts.

I was thinking that if a new logfile is started, it should set the 'ProcessID' = ''.
But then I got stuck, I was thinking that I could use the field 'offset', and then set that in a variable (oldposition).
and if the offset was < than the (oldpositition), it's because it has started on a new file.
But as the fields are reset for every line, I guess, this has to be ruby code as well? And then I got stuck, as I've never coded in Ruby.

Something like

ruby {
    init => '@@offset = 0'
    code => '
        o = event.get("offset")
        if o < @@offset then
            @@pid = nil
        end
        @@offset = o
    '
}
1 Like

Great, thankyou!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.