Aggregate filter plugin

occurred_on Bytes_Out user_id
2017-12-31T10:25:49.000-0600 1718 ts2341
2017-12-31T10:25:08.000-0600 1070 ts2341
2017-12-31T10:25:08.000-0600 355 ts2341
2017-12-29T16:55:01.000-0600 255 gs2121
2017-12-29T16:55:01.000-0600 255 gs2121
2017-12-29T16:55:01.000-0600 255 gs2121
2017-12-29T16:01:53.000-0600 2618803 gs2121
2017-12-29T16:01:53.000-0600 1684 gs2121
2017-12-29T16:01:53.000-0600 1351 gs2121
2017-12-29T16:01:53.000-0600 95323 gs2121
2017-12-29T16:01:52.000-0600 3500 gs2121
2017-12-29T16:01:52.000-0600 255 gs2121

Want to calculate and ingest, how much data is sent outside on daily basis. So i am trying to aggregate the logs on the basis of "userid" , sum the "Bytes_out" column on daily basis.
Request code/help on how to achieve this.

I assume you've had a look at the aggregate filter? If so, do you have any configuration in place so far?

Hi Paz,
Tried below...with no success, also i do not have idea about ruby ..
filter {
csv {
separator => ","
columns => ["occuredon","bytes_out","userid"]
}
mutate {
lowercase => [ "userid" ]
convert => ["bytes_out","integer"]
}
date {
match => ["occuredon", "yyyy/mm/dd HH:mm:ss", "ISO8601"]
target => "@timestamp"
}
aggregate {
task_id => "%{userid}"
code => "map['sum'] += event.get(‘bytes_out’);"
push_map_as_event_on_timeout => true
timeout_task_id_field => "userid"
timeout => 3600 # 1hr timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('bytesoutSum', event.get('sum'))"
}

             }

             output {
                       stdout {codec => rubydebug}
                        }

###codec output####

"tags" => [
[0] "_aggregateexception"
],

Hmm, that does sound like a code error indeed. Btw, it looks like that bytes_out is enclosed in some weird quotes. Can you try this one?

aggregate {
    task_id => "%{userid}"
    code => "map['bytesoutSum'] ||= 0 ; map['bytesoutSum'] += event.get('bytes_out').to_i"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "userid"
    timeout => 3600 # 1hr timeout
    timeout_tags => ['_aggregatetimeout']
}

I am not getting error "tags" => [[0] "_aggregateexception"],

however i am not getting consolidated logs for an hour. Below is the output of ruby codec.

{
"@timestamp" => 2017-12-31T12:47:53.000Z,
"host" => "redes6.Delta.com",
"bytes_out" => 1925,
"path" => "/tmp/itd/msfs/MSFS2.csv",
"@version" => "1",
"userid" => "cd41763",
"occured_on" => "2017-12-31T06:47:53.000-0600",
"message" => "2017-12-31T06:47:53.000-0600,1925,cd41763\r"
}
{
"@timestamp" => 2017-12-31T12:48:23.000Z,
"host" => "redes6.Delta.com",
"bytes_out" => 2669,
"path" => "/tmp/itd/msfs/MSFS2.csv",
"@version" => "1",
"userid" => "cd41763",
"occured_on" => "2017-12-31T06:48:23.000-0600",
"message" => "2017-12-31T06:48:23.000-0600,2669,cd41763\r"
}
{
"@timestamp" => 2017-12-31T12:48:24.000Z,
"host" => "redes6.Delta.com",
"bytes_out" => 1888,
"path" => "/tmp/itd/msfs/MSFS2.csv",
"@version" => "1",
"userid" => "cd41763",
"occured_on" => "2017-12-31T06:48:24.000-0600",
"message" => "2017-12-31T06:48:24.000-0600,1888,cd41763\r"

That's expected behavior. What you see is the actual logs themselves. They still pass through to the output since you're not dropping them specifically.

The aggregation filter does not alter the original messages themselves, it just creates new ones. You should be seeing aggregated events 1 hour after Logstash started.
More specifically, each new user_id's relevant aggregated event should spawn 1 hour after the first time you receive that user_id.

ok... will push them to ES and check...and get back...

Hi Paz,
Thanks,it is working :slight_smile: ....one more query , i am using .csv file as input, can i use "time stamp" present in .csv file, rather than system time, for time out...

thanks..

You mean like using that timestamp to control when the aggregation will expire, or something else?

1 Like

Yes Paz, i want to use date and time present in .csv file for time out rather system time.
example hourly, daily, or weekly aggregation of data.

this one is resolved...
for time stamp i am parsing date one more time in aggregation filter.
and for aggregation i am using %{+d} along with user id to aggregate days data. similarly %{+ww} can be used to aggregate weakly data .

task_id => "%{userid}_%{+d}"

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.