I'm trying to unittest a filter that utilizes aggregate filter.
In my sample, I'm dropping all messages and wait for the timeout to occur instead. I'd like to retrieve this async message somehow and verify if the contents.
Sample Pipeline config:
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
}
aggregate {
task_id => "%{user_id}"
code => "map['clicks'] ||= 0; map['clicks'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
push_previous_map_as_event => true
timeout => 1
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
}
drop {}
}
Sample test
message1 = 'INFO - 12345 - Clicked One'
message2 = 'INFO - 12345 - Clicked Two'
message3 = 'INFO - 12345 - Clicked Three'
sample([message1, message2, message3], :validation_source => :queue) do
# Check the ouput event/message properties or in my case...try to figure out how to get these message
sleep(5)
puts(results.last)
puts(pipeline.filter_queue_client.read_batch)
Any help is appreciated (probably my lack of good ruby knowledge isn't helping me as well...)