Make Single output call with multiple events

I've done a few days of searching but can't find what I'm looking for....the closest I've found is this topic (How to aggregate multiple events into single output)

I'll state that I'm starting with a functional pipeline. My pipeline performs with an input from JDBC, issuing a query, it does several filters to ensure what I need is working, then it does an http output to put the data into the place where I want it....what I'm trying to do is 'optimize' the output. My http can accept either a single event, or multiple events in a single call. What I would like to be able to do is collect all of my inputs (anywhere from 1 to n), process them via filter into a single output, and make a call to my http service once. This would cut down on the number of network calls my pipeline had and greatly increase my throughput. I've come across the aggregate filter, and on the surface it seems like something close to what I'm looking for, but I'm not wanting to take the duration from 3 separate events and sum those together...I want to take ALL of 3 events, append them all to a single hash, then take that hash and make a single output call.

If aggregate filter is the right filter to use, I can't seem to get the syntax proper to grab all of a single event and add it to something else...and another problem that I seem to have with aggregate is that it needs a task_id....other than the fact that all of these events came from the source, and are going to the same destination, they don't really have anything in common...I just want to reduce my output count to 1....Any/all assistance is appreciated.

You could try something like this

    ruby {
        init => '@counter = 0; BATCH = 3'
        code => '
            event.set("[@metadata][index]", BATCH - @counter % BATCH)
            @counter += 1
        '
    }
    mutate { add_field => { "[@metadata][task_id]" => 1 } }
    if [@metadata][index] != 1 {
        aggregate {
            task_id => "%{[@metadata][task_id]}"
            code => '
                map["data"] ||= []
                map["data"] << event.to_hash
                event.cancel
            '
        }
    } else {
        aggregate {
            task_id => "%{[@metadata][task_id]}"
            code => '
                map["data"] ||= []
                map["data"] << event.to_hash
                event.set("data", map["data"])
            '
            map_action => "update"
            end_of_task => true
            timeout => 120
        }
    }

which will produce events like

{
      "data" => [
    [0] {
                 "a" => 1,
             "event" => {
            "original" => "{ \"a\": 1 }",
            "sequence" => 0
        },
        "@timestamp" => 2022-10-04T23:32:53.344931310Z,
              "host" => {
            "name" => "ip-172-31-22-149.us-east-2.compute.internal"
        },
          "@version" => "1"
    },
    [1] {
                 "a" => 1,
             "event" => {
            "original" => "{ \"a\": 1 }",
            "sequence" => 1
        },
        "@timestamp" => 2022-10-04T23:32:53.350508434Z,
              "host" => {
            "name" => "..."
        },
          "@version" => "1"
    },
    [2] {
                 "a" => 1,
             "event" => {
            "original" => "{ \"a\": 1 }",
            "sequence" => 2
        },
        "@timestamp" => 2022-10-04T23:32:53.352077391Z,
              "host" => {
            "name" => "..."
        },
          "@version" => "1"
    }
],
         "a" => 1,
"@timestamp" => 2022-10-04T23:32:53.352077391Z,
      "host" => {
    "name" => "..."
},
  "@version" => "1"
}

You may want to

mutate { remove_field => [ "event" ] }

and make other changes before the aggregate, and you may want to remove fields that are duplicated in the event that is not cancelled.

1 Like

I'm sorry, I'm fairly new to logstash, and know NO ruby....but let me see if I understand what it's doing here....

All of this goes in a filter section. Each event will cause the filter section to fire...so,

first you are doing some ruby, initializing a counter to 0 and batch to 3...what does the @ annotation on counter do vs not on batch?
Then in the code you are setting @metadata index = batch - couter % batch, then updating counter to +1....can't say I really understand what this code is doing....lets move onto the next filter

now you are doing a mutate, add field setting the task_id = 1. I assume this is so that because aggregate NEEDS a task id, and one doesn't exist...we are creating one, so that it can do what it needs on all of the events, as if they are part of the same task

now we if the index != 1 (something to do with the ruby code above that I don't fully understand) we stuff this events content into a map named data, and we do this until we have reached the batch size

else, if we have reached batch size, we do the same stuffing of the current event into the data hash and we update and we end the task

So, what I end up with at the end of my batch is my final event, and a data hash that contains all of the events up to that point.

I have two problems. Lets say that I have 15 events, if I set my batch size to 10, my output is called at 10 events, then it processes the other 5, but never does anything with them....do I need a timeout_code to handle that situation? If so...what would it be?

Second issue I have is that I now have an event that has all of the element of my final event, AND a data hash that is an aggregation of all of my events...I can deal with that...but my http output, I only want the data hash to be sent...so I'm trying this

http {
url => "URL in question"
http_method => "post"
content_type => "application/json"
#format => "message"
format => "message"
retry_failed => false
http_compression => true
headers => {
"Content-Type" => "application/json"
"Authorization" => "token"
}
message => "%{data}"
}

for message, I'm having trouble with the format...it's a JSON array, so I need it to be

[{content},{content}]

now my data array should be {content},{content}, I think...so I've tried
"[%{data}]"
and
"%{[data]}"
but I can't seem to get a combination of fields that allows my remote system to not throw a 400 error....

So, your assistance with these two remaining questions would be appreciated :slight_smile:

I was able to solve this problem by doing a prune

prune {
whitelist_names => ["data"]
}

then I added another filter

json_encode {
source => "data"
}

which transforms from logstash format to json, then in the http output I have this as my message

message				=> "%{data}"

So...I almost have everything that I need. The only thing I need is the ability to process a partial chunk, and I'm not sure exactly how to go about doing that

I want to combine multiple events into a single event, in this case three at a time. In Ruby, any "variable" that starts with a capital letter is a constant. I increment the counter for each event and add the counter value modules three to [@metadata]. The result of ruby filter is that the [@metadata][index] value on a sequence of events will be 3, 2, 1, 3, 2, 1, 3...

In Ruby, prefixing a variable name with @ gives it instance scope, it can be used anywhere in that filter.

If the [@metadata][index] is not one, then stash the contents of the event and drop the event. If it is one, then stash the contents of the event and add the stash to the current event. This results in three events being combined into one event that has data from all three.

I hadn't thought through what happens if a timeout occurs. It's probably not good. You could add a timeout_code option but it doesn't have an event to attach map["data"] to. I cannot think of a solution right now.

That is an excellent explanation, thank you!

Lets work through this a different way. Is there a way in the Filter section to know how many events there are in the overall input? Is there a way to set the batch to that number automatically, so that the entire input is handled by the batch? If that is done it would prevent any inputs from being dropped, right?

In the filter section I am not aware of any way to know the batch size.

Ok....after working on it most of the day, this is what I ended up with

mutate {
add_field => { "[@metadata][task_id]" => 1 }
}
aggregate {
task_id => "%{[@metadata][task_id]}"
code => '
map["data"] ||=
map["data"] << event.to_hash
event.set("data", map["data"])
'
push_map_as_event_on_timeout => true
timeout => 5
}
if [data] {
ruby {
code => 'event.set("event_length", event.get("data").length)'
}
} else {
mutate {add_field => {"event_length" => 0}}
}
if [event_length] < 100 {
aggregate {
task_id => "%{[@metadata][task_id]}"
code => 'event.cancel'
}
} else {
aggregate {
task_id => "%{[@metadata][task_id]}"
code => ''
map_action => "update"
end_of_task => true
}
}

This basically always maps the event to the data array. Then, if the size of the array is smaller than my chunk size, I cancel the event (allowing it to aggregate), if it's > my chunk size, I end the task. I added a timeout to the first aggregate so when the final chunk doesn't add up to chunk size in 5 seconds, it'll return the event and I can process that chunk

I'm sure that I'm not quite doing something 'right' here, and I bet you are better than I and can spot some places for improvement....but this seems to be working for me...it's taking about 30-40 seconds to update 4k records in my destination, which is a far sight better than I was getting before...so, if you don't reply again, thank you for your assistance. If you do reply with things I missed, also appreciated :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.