Logstash http output plugin customize building json array for batching

Hello ,

We have a pipeline running on logstash 6.6.2 with input(redis)->filter(ruby)->output(http)

how do you batch http output using custom format ? we can't use format="json_batch" because it will construct a simple json array which will be invalid per the schema defined by the http endpoint (owned by different team).

How do i go about combining batch of messages using my own json format ? any ideas/suggestions ?


Input log messages :

{json1} , {json2} , {json3} .. {jsonN}

output (with format="json_batch"):

 [{json1} , {json2} , {json3} .. {jsonN}] 

output (customized):

{ records : [{json1} , {json2} , {json3} .. {jsonN}] }

That is not simple. outputs sometimes have a receive method that consumes a single event, and sometimes have a multi_receive method that consumes an array of events (a batch). The multi_receive method typically iterates over the array and processes the events one at a time. The http output is unusual in that it can process the entire array in one shot. However, it provides no flexibility on the request format that it sends over http.

That means you will have to do some work in the filter section, and the filter section does not see a batch, it sees individual events.

If you are OK with sending one event per request

{ records : [{json1}] }
{ records : [{json2}] }
{ records : [{json3}] }

etc., then modifying the format is not very hard.

Otherwise you are going to have to aggregate the events to create a batch. That can be done. There is an example of combining events with a limit on size here. You could change that to add a set number of events to an array, effectively re-creating the batch.

Which approach do you prefer?

Thanks Badger. Since I am trying to optimize the number of HTTP Calls , anything that i can do to send batch of messages in a single request would help.

In your example , filter "aggregate" seems to need taskid to group events , where as the log messages i have do not have any correlation between them . how can we use it ?

i see what you are saying , basically consider all messages as part of the same taskid and create batches either based on the size or number of messages. That should work :slight_smile:

Assuming you do not care which events are combined with which others then you can add one

mutate { add_field => { "[@metadata][task]" => "1" } }

and then use

task_id => "[@metadata][task]"

Normally you have to set pipeline.workers to 1 to use aggregate but in this case it may work even if you do not.

1 Like