Capturing Start and Stop Times in An Aggregated Data Record?

Hi Logstash Superheroes,

Not long ago, this forum helped me with the syntax for an Aggregate Filter Plugin. I got it working, but now I need to append Start and Stop timestamps to the aggregated records.

To explain… I have initial data records that look like this:

@timestamp  NameF   NameL   EmployeeID  Sales
=============================================
10000       Abby    Smith   12345       100
10001       Becky   Jones   12346       200
10002       Abby    Smith   12345        25
10005       Abby    Smith   12345        50

Using an aggregate filter (posted below), my Logstash can aggregate those sales numbers, which is great. But I’m realizing that in each aggregated record, I also need the first and last timestamps (“FirstTstamp” and “LastTstamp”) of the raw records. This will give me a nice time delta for later analysis:

@timestamp  NameF   NameL   EmployeeID  Sales  totalSales  FirstTstamp  LastTstamp  tags
================================================================================================
10000       Abby    Smith   12345       100    null        null          null          null
10001       Becky   Jones   67890       200    null        null          null          null
10002       Abby    Smith   12345        25    null        null          null          null
10005       Abby    Smith   12345        50    null        null          null          null
10006       Becky   Jones   67890       null    200        10001         10001      _aggregatetimeout
10010       Abby    Smith   12345       null    175        10000         10005      _aggregatetimeout   

Doesn’t seem like this should be too hard. In pseudocode:

New record arrives.  Aggregate filter does the following:
  Does the aggregate map have a field named ‘FirstTstamp’?
    If no, create field and insert data record’s ‘Timestamp’ value here.
  In the aggregate map’s ‘LastTstamp’ field, insert data record’s ‘Timestamp’ value.

Here’s the actual code:

  aggregate {
    task_id => "%{EmployeeID}"
    code => "
       map['totalSales'] ||= 0;
       map['totalSales'] += event.get('Sales');
       map['NameF'] = event.get('NameF');
       map['NameL'] = event.get('NameL');
       if defined?(map['FirstTstamp'])
         map['FirstTstamp'] = event.get('@timestamp');
       end
       map[‘LastTstamp'] = event.get('@timestamp');
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "EmployeeID"
    timeout => 5         # 5 second timeout
    timeout_tags => ['_aggregatetimeout']
  }

Okay, here’s the problem. The above code applies the timestamp of the aggregated record’s own timestamp in “FirstTstamp” and “LastTstamp,” which is not the information I want at all:

@timestamp NameF   NameL   EmployeeID  Sales  totalSales  FirstTstamp  LastTstamp  tags
========================================================================================================
10006      Becky   Jones   67890      null     200        10006        10006        _aggregatetimeout   
10010      Abby    Smith   12345      null     175        10010        10010        _aggregatetimeout   

There’s got to be a way to do this, but a dozen Google searches and rummaging about on the forums haven’t revealed the way. Any advice? Thanks!

Should that be if !defined?

Ah, good catch, Badger. You're really sharp, I appreciate your eye!

When I make that edit:

   if !defined?(map['FirstTstamp'])
     map['FirstTstamp'] = event.get('@timestamp');
   end
   map[‘LastTstamp'] = event.get('@timestamp');

The aggregated data now looks like this:

@timestamp NameF   NameL   EmployeeID  Sales  totalSales  FirstTstamp  LastTstamp  tags
========================================================================================================
10006      Becky   Jones   67890      null     200         null        10001        _aggregatetimeout   
10010      Abby    Smith   12345      null     175         null        10005        _aggregatetimeout   

The LastTstamp is correct (yay!), but the FirstTstamp is never recorded. There's got to be a way to capture that!

I tried this as a variation:

    if !defined?(map['FirstTstamp']  ||  map['FirstTstamp'] > event.get('@timestamp')
      map['FitstTstamp'] = event.get('@timestamp');
    end

In other words, "if FirstTstamp doesn't already exist or if the value in that field is bigger timestamp, insert the current timestamp into FirstTstamp." I'm prob screwing up the logic, and the above didn't run without throwing a scary Java error anyway.

Any ideas? I was sort of hoping someone else has asked this question of how to show min and max timestamps of an aggregated record. Thank you!

I suggest trying

map['FirstTstamp'] ||= event.get('@timestamp');

That will do the assignment iff map['FirstTstamp'] is nil.

Bingo! That did it. Thank you, Badger, you're a Logstash genius! Much appreciated...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.