Logstash http output plugin customize building json array for batching

Hello ,

We have a pipeline running on logstash 6.6.2 with input(redis)->filter(ruby)->output(http)

how do you batch http output using custom format ? we can't use format="json_batch" because it will construct a simple json array which will be invalid per the schema defined by the http endpoint (owned by different team).

How do i go about combining batch of messages using my own json format ? any ideas/suggestions ?


Input log messages :

{json1} , {json2} , {json3} .. {jsonN}

output (with format="json_batch"):

 [{json1} , {json2} , {json3} .. {jsonN}] 

output (customized):

{ records : [{json1} , {json2} , {json3} .. {jsonN}] }

That is not simple. outputs sometimes have a receive method that consumes a single event, and sometimes have a multi_receive method that consumes an array of events (a batch). The multi_receive method typically iterates over the array and processes the events one at a time. The http output is unusual in that it can process the entire array in one shot. However, it provides no flexibility on the request format that it sends over http.

That means you will have to do some work in the filter section, and the filter section does not see a batch, it sees individual events.

If you are OK with sending one event per request

{ records : [{json1}] }
{ records : [{json2}] }
{ records : [{json3}] }

etc., then modifying the format is not very hard.

Otherwise you are going to have to aggregate the events to create a batch. That can be done. There is an example of combining events with a limit on size here. You could change that to add a set number of events to an array, effectively re-creating the batch.

Which approach do you prefer?

Thanks Badger. Since I am trying to optimize the number of HTTP Calls , anything that i can do to send batch of messages in a single request would help.

In your example , filter "aggregate" seems to need taskid to group events , where as the log messages i have do not have any correlation between them . how can we use it ?

i see what you are saying , basically consider all messages as part of the same taskid and create batches either based on the size or number of messages. That should work :slight_smile:

Assuming you do not care which events are combined with which others then you can add one

mutate { add_field => { "[@metadata][task]" => "1" } }

and then use

task_id => "[@metadata][task]"

Normally you have to set pipeline.workers to 1 to use aggregate but in this case it may work even if you do not.

1 Like

Hi Badger ,

i am able to aggregate and batch the events to output successfully , however looks like the timeout functionality is not working . i have set the following in the first aggregate filter.

push_map_as_event_on_timeout => true
timeout => 10
timeout_code => "event.set('[@metadata][timeToFlush]', true)"
timeout_task_id_field' => "[@metadata][taskid]"

I would suggest removing the drop {} and replacing the output with

stdout { codec => rubydebug { metadata => true } }

output. See if the timeout event shows up.

Thanks Badger , I had some output conditions based on original event properties , for timeout scenarios those were not set , once that was fixed , everything worked :slight_smile:

it looks like sometimes i am getting individual events at the output even though all of them should be aggregated or timed out events . is it expected ? [as any raw event should have been dropped at filter] . is this because of some kind of concurrency issue ?
i was also able to confirm these individual events that reach output were also output as part of aggregated events.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.