Aggregate filter plugin session duration

Hello,

Reviewing ways to calculate session duration and using transforms I was able to calculate session duration by finding the min and max time between timestamps. This is fine except in cases where there is no unique session id for a group by. In these cases the use of a timeout interval is needed for tracking sessions. The aggregate filter plugin in logstash seemed appropriate and I stumbled upon this discussion attempting to do the same thing:

Here is the sample code I'm trying to build on:

aggregate {
task_id => "%{clientip}"
code => "map['timestamp'] ||= event.get('@timestamp').to_i"
push_map_as_event_on_timeout => true
timeout_task_id_field => "clientip"
timeout => 1800
inactivity_timeout => 600
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('session_duration', map['timestamp'] - event.get('@timestamp').to_i)"
}

The code section of the aggregate filter works:

[DEBUG] 2021-07-07 12:00:48.855 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"map['start_time'] ||= event.get('@timestamp').to_i"}

And I can see that in the following error message the field start_time has been properly converted into an integer start_time"=>1622130568, :

[ERROR] 2021-06-24 13:59:07.666 [[main]>worker0] aggregate - Aggregate exception occurred {:error=>#<NameError: undefined local variable or methodmap' for #<LogStash::Filters::Aggregate:0x6c9b6f82>>, :timeout_code=>"event.set('session_duration', map['start_time'] - event.get('@timestamp').to_i)", :timeout_event_data=>{"start_time"=>1622130568, "@timestamp"=>2021-06-24T18:59:07.665Z, "clientip"=>"222.222.222.222", "@version"=>"1", "tags"=>["_aggregate_timeout"]}}

But as you can see in the timeout_code section of the above error messsage the timestamp is not being converted into an integer:

timeout_code=>"event.set('session_duration', map['start_time'] - event.get('@timestamp').to_i)", :timeout_event_data=>{"start_time"=>1622130568, "@timestamp"=>2021-06-24T18:59:07.665Z

The final event.get('@timestamp').to_i) is not doing the conversion. Any thoughts on how to resolve this? Thanks.

If I remember correctly the timeout_code cannot reference the event using event.get because it is not executing in the context of processing an event. It is off in a daemon thread that processes timeouts.

Hello,

Then would I need to add a new variable in the timeout_code to store the timestamp on the timeout?

Sorry, I did not explain that clearly. In the timeout there is an event, so you can call event.get and event.set, but the fields in the event are the contents of the map. map no longer exists at that point. Instead of map['timestamp'] use event.get('timestamp')

Hello,

Thanks for the explanation, I'm getting there but could use a little more clarity on this just to make sure I'm doing this correctly as I'm still running into the error. I realize my config is slightly different as I'm trying to base calculate a session duration when there is no end event. Here is my config:

aggregate {
        task_id => "%{clientip}"
        timeout_timestamp_field => "@timestamp"
        code => "map['start_time'] ||= event.get('@timestamp').to_i"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "clientip"
        timeout => 600
        inactivity_timeout => 300
        timeout_tags => ['_aggregate_timeout']
        timeout_code => "event.set('session_duration', event.get('timestamp') - event.get('@timestamp').to_i)"

Essentially what I'm trying to do is take the start_time and subtract it from the timestamp created at the timeout_code from the timeout interval. Does this make sense?

event.get('@timestamp') will return the time when the timeout occurs (i.e. when the map entry is converted to an event). So you want to subtract the start time from that, but you are doing it the other way around, so you will get a negative duration. You set map['start_time'] but use event.get('timestamp') which does not match. That will cause an exception.

Note that the delta between @timestamp and map['start_time'] can overestimate the session duration by the amount of time it takes for the map to timeout.

    timeout => 600
    inactivity_timeout => 300

That timeout will occur if a session is inactive for 300 seconds, or 600 seconds after it started if it remains active. The aggregate filter does not provide a way to distinguish the two.

Hello,

Thanks for all your help with this. I've got the data parsed correctly. However, I'm not sure this is working as intended. Looking at the data in discover this is what I see:

_id ADrLgnoBiqmWBVtiNPfH
_index session_duration
_score -
_type _doc
@timestamp Jul 7, 2021 @ 14:06:03.434
@version 1
Multi fields
@version.keyword:

1||
|clientip|111.111.111.111||
||Multi fields|
||clientip.keyword:

111.111.111.111||
|session_duration|1,625,691,963||
|start_time|1,621,890,246||
|tags|_aggregate_timeout||
||Multi fields|
||tags.keyword:

_aggregate_timeout|

start_time 1,621,890,246 = May 24, 2021 9:04:06 PM
session_duration = 1,625,691,963 July 7, 2021 9:06:03 PM

Somehow the session_duration is taking the current system time as the timestamp. Ideally what I am trying to do is have session_duration represented as an integer ideally in seconds format. I also removed the inactivity_timeout Here is my updated config:

aggregate {
        task_id => "%{clientip}"
        timeout_timestamp_field => "@timestamp"
        code => "map['start_time'] ||= event.get('@timestamp').to_i"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "clientip"
        timeout => 600
        timeout_tags => ['_aggregate_timeout']
        timeout_code => "event.set('session_duration', event.get('@timestamp').to_i) - event.get('start_time')"
}
}

Thanks

That should be

event.set('session_duration', event.get('@timestamp').to_i - event.get('start_time'))

Your code is subtracting [start_time] from the value returned by event.set and not doing anything with the result.

Hello,

Good eye! thanks for catching my mistake. I'm getting somewhere I just need to fine tune it looks like.

My sample log data has has 3 sessions in it.

IP 1 is a single session roughly 25 minutes from start to finish: 27/May/2021:08:28:30 to 27/May/2021:08:53:39

IP 2 should be two sessions. Each session from IP 2 is roughly 30 minutes each for a total of 60 minutes. The time stamps from start to finish: 24/May/2021:13:47:39 to 24/May/2021:15:38:09

If i set the interval to 1800/30 minutes I would expect to see 3 calculated fields for duration. I am seeing 4:

session_duration: 3,567,502
session_duration: 3,566,898
session_duration: 3,801,435
session_duration: 3,802,026

I'm not sure what format these values are in/how I would convert them into seconds but more importantly I'd expect that I would only have session duration fields. Here is my final config:

aggregate {
        task_id => "%{clientip}"
        timeout_timestamp_field => "@timestamp"
        code => "map['start_time'] ||= event.get('@timestamp').to_i"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "clientip"
        timeout => 1800
        timeout_tags => ['_aggregate_timeout']
        timeout_code => "event.set('session_duration', event.get('@timestamp').to_i - event.get('start_time'))"
}
}

Those are numbers of seconds. There are about half a million seconds per week, so those are about 7 weeks. That would be the difference between now and 24th May. If you look at the events do they have current timestamps?

You may want something more like

code => "
    map['start_time'] ||= event.get('@timestamp').to_i
    map['last_time'] = event.get('@timestamp').to_i
"
timeout_code => "event.set('session_duration', event.get('last_time') - event.get('start_time'))"

Hello,

Yeah they have current time stamps. In our logs, each request is a chunk of data written at the time of connection. Every timestamp in the logs is written when a user/device makes a request to a the file/uri field. So I know when connections "start" just not when they end.

My sample log data ends on 5/27. These are test connections by me. Where I deliberately made 1 25 minute connection from a single IP and then I made 2 other 30 minute connections by a different IP. When I say connections, what I mean is I was tuning into a file via a web player for 25 minutes from IP 1 then severed the player connection. For the second IP I tuned into the player for roughly 30 minutes. I then severed the connection and restarted it for another 30 minutes. If I were to get the delta between the start and end time of the IP second IP it would be 60 minutes even though its supposed to be two sessions of 30 minutes. Hence the session timeout.

I'll give this a try but can you explain to me how 'last_time would reference the timestamp at the timeout? From my non ruby brain it looks like both start_time and last_time are just storing the same timestamp in the code section.

||= will only do the assignment if the left-hand-side is nil. This start_time will get set to the first timestamp seen. = will always do the assignment, so last_time will keep getting overwritten with the latest timestamp.

When the timeout occurs last_time will have the timestamp from the last log message.

Hello,

That makes perfect sense. I've inverted the start_time and end_time as we need to subtract the start time from the end time:

timeout_code => "event.set('session_duration', event.get('start_time') - event.get('end_time'))"

This is working for the most part, I still have high duration counts but I think some of this has to do with elasticsearch parsing the server time as a timestamp and the log message timestamp. For some reason the first IP in the log parses appropriately. The second IP in the log doesn't apply the aggregate filter correctly and produces a negative session_duration value.

Although the timestamps in my logs only account for days between May 24th - May 27th, I see the current server time of the indexed documents being parsed as well. And only the current July dates contain a session_duration field with corresponding documents. This is only for the first IP as well:

The aggregate filter is being applied to IP 111. However it contains the the current timestamp with the duration. The aggregate filter is not being applied to IP 222. but contains the timestamp in the log message and no session_duration value. Negative values aside, the session_duration for IP 111. is around 646 minutes or about 11 hours. Much higher than the expected 25 minutes.

Just bumping this for visibility