Hi Logstash Superheroes,
Not long ago, this forum helped me with the syntax for an Aggregate Filter Plugin. I got it working, but now I need to append Start and Stop timestamps to the aggregated records.
To explain… I have initial data records that look like this:
@timestamp  NameF   NameL   EmployeeID  Sales
=============================================
10000       Abby    Smith   12345       100
10001       Becky   Jones   12346       200
10002       Abby    Smith   12345        25
10005       Abby    Smith   12345        50
Using an aggregate filter (posted below), my Logstash can aggregate those sales numbers, which is great.  But I’m realizing that in each aggregated record, I also need the first and last timestamps (“FirstTstamp” and “LastTstamp”) of the raw records.  This will give me a nice time delta for later analysis:
@timestamp  NameF   NameL   EmployeeID  Sales  totalSales  FirstTstamp  LastTstamp  tags
================================================================================================
10000       Abby    Smith   12345       100    null        null          null          null
10001       Becky   Jones   67890       200    null        null          null          null
10002       Abby    Smith   12345        25    null        null          null          null
10005       Abby    Smith   12345        50    null        null          null          null
10006       Becky   Jones   67890       null    200        10001         10001      _aggregatetimeout
10010       Abby    Smith   12345       null    175        10000         10005      _aggregatetimeout   
Doesn’t seem like this should be too hard. In pseudocode:
New record arrives.  Aggregate filter does the following:
  Does the aggregate map have a field named ‘FirstTstamp’?
    If no, create field and insert data record’s ‘Timestamp’ value here.
  In the aggregate map’s ‘LastTstamp’ field, insert data record’s ‘Timestamp’ value.
Here’s the actual code:
  aggregate {
    task_id => "%{EmployeeID}"
    code => "
       map['totalSales'] ||= 0;
       map['totalSales'] += event.get('Sales');
       map['NameF'] = event.get('NameF');
       map['NameL'] = event.get('NameL');
       if defined?(map['FirstTstamp'])
         map['FirstTstamp'] = event.get('@timestamp');
       end
       map[‘LastTstamp'] = event.get('@timestamp');
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "EmployeeID"
    timeout => 5         # 5 second timeout
    timeout_tags => ['_aggregatetimeout']
  }
Okay, here’s the problem. The above code applies the timestamp of the aggregated record’s own timestamp in “FirstTstamp” and “LastTstamp,” which is not the information I want at all:
@timestamp NameF   NameL   EmployeeID  Sales  totalSales  FirstTstamp  LastTstamp  tags
========================================================================================================
10006      Becky   Jones   67890      null     200        10006        10006        _aggregatetimeout   
10010      Abby    Smith   12345      null     175        10010        10010        _aggregatetimeout   
There’s got to be a way to do this, but a dozen Google searches and rummaging about on the forums haven’t revealed the way. Any advice? Thanks!