Aggregate documents

Hi all,

I've created a pipeline that aggregate all input documents and generate a count for each group.

in this moment I have two problems :

  1. at the first execution the aggregation works fine and the count is correct
  2. after the first execution each count double, I think why the map remain in memory
  3. all input documents are sent to the output

what I want is:

  1. find a way to release the map after each execution
  2. send to the output only the aggregate documents

please help me

thank you in advance
Diego

here my pipeline:

input {
  elasticsearch{
    read all documents
  }
}

filter {

  ruby {
    code => "
      require 'time'
      if event.get('reg512Created')
        date = Time.parse(event.get('reg512Created'))
        event.set('reg512Created_day', date.strftime('%d-%m-%Y'))
      end
    "
  }

  aggregate {
    task_id => "%{reg512UPNRequestor}-%{reg512Created_day}"
    code => "
      map['regCount'] ||= 0; map['regCount'] += 1
      event.set('regCount', map['regCount'])
    "
    push_previous_map_as_event => true
    timeout => 45
  }

  aggregate {
    task_id => "%{reg512UPNRequestor}-%{reg512Created_day}"
    end_of_task => true
  }

}

output {
  elasticsearch{
    receive all documents
  }
}

I wouldn't expect this to work.

Getting rid of the source documents and just keeping the aggregations is simple. Just add event.cancel to the code option of the first aggregate.

The second aggregate will not get triggered by the aggregations since those events do not have the [reg512UPNRequestor] and [reg512Created_day] fields. The filter will be a no-op. You need to add them to the map if you want them to be in those aggregated events. You may want to add the task_id as well.

I would expect the second aggregate to be triggered by every source document. It is clearly not, but I do not understand why.

Hi Badger, thanks for your reply, I've solved so

  aggregate {
    task_id => "%{reg512UPNRequestor}-%{reg512Created_day}"
    code => "
      map['field-1'] = event.get('reg512UPNRequestor')
      map['field-2'] = event.get('reg512Created_day')
      map['myCount'] ||= 0; map['myCount'] += 1
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }

this works very fine