Hi Logstash Superheroes,
Not long ago, this forum helped me with the syntax for an Aggregate Filter Plugin. I got it working, but now I need to append Start and Stop timestamps to the aggregated records.
To explain… I have initial data records that look like this:
@timestamp NameF NameL EmployeeID Sales
=============================================
10000 Abby Smith 12345 100
10001 Becky Jones 12346 200
10002 Abby Smith 12345 25
10005 Abby Smith 12345 50
Using an aggregate filter (posted below), my Logstash can aggregate those sales numbers, which is great. But I’m realizing that in each aggregated record, I also need the first and last timestamps (“FirstTstamp
” and “LastTstamp
”) of the raw records. This will give me a nice time delta for later analysis:
@timestamp NameF NameL EmployeeID Sales totalSales FirstTstamp LastTstamp tags
================================================================================================
10000 Abby Smith 12345 100 null null null null
10001 Becky Jones 67890 200 null null null null
10002 Abby Smith 12345 25 null null null null
10005 Abby Smith 12345 50 null null null null
10006 Becky Jones 67890 null 200 10001 10001 _aggregatetimeout
10010 Abby Smith 12345 null 175 10000 10005 _aggregatetimeout
Doesn’t seem like this should be too hard. In pseudocode:
New record arrives. Aggregate filter does the following:
Does the aggregate map have a field named ‘FirstTstamp’?
If no, create field and insert data record’s ‘Timestamp’ value here.
In the aggregate map’s ‘LastTstamp’ field, insert data record’s ‘Timestamp’ value.
Here’s the actual code:
aggregate {
task_id => "%{EmployeeID}"
code => "
map['totalSales'] ||= 0;
map['totalSales'] += event.get('Sales');
map['NameF'] = event.get('NameF');
map['NameL'] = event.get('NameL');
if defined?(map['FirstTstamp'])
map['FirstTstamp'] = event.get('@timestamp');
end
map[‘LastTstamp'] = event.get('@timestamp');
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "EmployeeID"
timeout => 5 # 5 second timeout
timeout_tags => ['_aggregatetimeout']
}
Okay, here’s the problem. The above code applies the timestamp of the aggregated record’s own timestamp in “FirstTstamp” and “LastTstamp,” which is not the information I want at all:
@timestamp NameF NameL EmployeeID Sales totalSales FirstTstamp LastTstamp tags
========================================================================================================
10006 Becky Jones 67890 null 200 10006 10006 _aggregatetimeout
10010 Abby Smith 12345 null 175 10010 10010 _aggregatetimeout
There’s got to be a way to do this, but a dozen Google searches and rummaging about on the forums haven’t revealed the way. Any advice? Thanks!