RSpec: Testing Aggregate Filters

I'm trying to unittest a filter that utilizes aggregate filter.
In my sample, I'm dropping all messages and wait for the timeout to occur instead. I'd like to retrieve this async message somehow and verify if the contents.

Sample Pipeline config:

filter {
  grok {
      match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
    }

    aggregate {
        task_id => "%{user_id}"
        code => "map['clicks'] ||= 0; map['clicks'] += 1;"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "user_id"
        push_previous_map_as_event => true
        timeout => 1
        timeout_tags => ['_aggregatetimeout']
        timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
    }

    drop {}
}

Sample test

message1 = 'INFO - 12345 - Clicked One'
        message2 = 'INFO - 12345 - Clicked Two'
        message3 = 'INFO - 12345 - Clicked Three'
        sample([message1, message2, message3], :validation_source => :queue) do
       
          # Check the ouput event/message properties or in my case...try to figure out how to get these message
          sleep(5)

          puts(results.last)
          puts(pipeline.filter_queue_client.read_batch)

Any help is appreciated (probably my lack of good ruby knowledge isn't helping me as well...)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.