Hello,
I am trying to do the following:
Take input on tcp and send it to a http endpoint. I need to send batches of events to the http endpoint. I am trying to use the Aggregate filter to batch events and then sending to http endpoint as a json array. Here is what the config looks like:
input {
tcp {
port => 8081
}
}
filter {
ruby {
code => 'event.set("randomNumber", rand(10))'
}
aggregate {
task_id => "%{randomNumber}"
timeout => 5
push_map_as_event_on_timeout => true
code => "
map['messages'] ||= []
map['messages'] << event.get('message')
event.cancel()
"
}
json_encode {
source => "messages"
target => "messages"
}
}
output {
http {
http_method => "post"
headers => ["Content-Type", "application/json", "Content-Encoding", "gzip"]
url => "http://localhost:8086/logs/json"
format => "message"
message => "%{messages}"
content_type => "application/json"
http_compression => "true"
}
}
pipeline workers is set to 20
My expectation is:
The aggregate filter will put the incoming data in 10 random buckets and send the bucket after 5s timeout to the output. I am expecting to see upto 10 parallel http calls.
However, I only see one call and if that call is slow then no other http call happens.