Using aggregate filter to merge different events

Hi all,
there is a log which some evenets of it have a same task_id

A1=12;A2=gty;A3=tsbr;A4=5798
B1=adr;B2=156;B3=765
A1=12;A2=gty;A3=tsbr;A4=5798;A5=895
B1=adr;B2=156;B3=765;B4=khu

as following:
task_id=1 => A1=12;A2=gty;A3=tsbr;A4=5798
task_id=1 =>B1=adr;B2=156;B3=765
task_id=2 => A1=12;A2=gty;A3=tsbr;A4=5798;A5=895
task_id=2 =>B1=adr;B2=156;B3=765;B4=khu

i want to merge these data based on their task_id so that the output of merging these lines be as following:

for task_id=1:
A1=12;A2=gty;A3=tsbr;A4=5798;B1=adr;B2=156;B3=765
fot task_id=2:
A1=12;A2=gty;A3=tsbr;A4=5798;A5=895;B1=adr;B2=156;B3=765;B4=khu

could you please advise me about this?

If your input is strictly alternating lines, one starting with A and one not, then this would work provided you run with '--pipeline.workers 1 --java-execution false'

    mutate { add_field => { "[@metadata][constant]" => "1" } }
    if [message] =~ /^A/ {
        aggregate {
            task_id => "%{[@metadata][constant]}"
            map_action => "create"
            code => '
                map["savedMessage"] = event.get("message")
                event.cancel
            '
        }
    } else {
        aggregate {
            task_id => "%{[@metadata][constant]}"
            map_action => "update"
            end_of_task => true
            code => '
                event.set("message", map["savedMessage"] + ";" + event.get("message"))
            '
        }
    }

It will not handle any variation in the pairing of lines.

surry for my mistake. it is just an axample, and actually it can be as following:

acc1=32;body=hgu;cel=5584;.....

A and B just are example; the important things are that : these lines have same task_id and aggregation should be based on the task_id so that concatenate line 2 to end of line 1 with an ";" sign. the problem is how can do these
lets look differently; there are two event as following:
{
task_id : 1
message: "ac1=12;bd2=gty;cell=tsbr;id=5798;no=895"
}

{
task_id:1
message: "BX1=adr;cell2=156;txn=765;no2=khu"
}

the desire output is as following:

{
task_id:1
message: "ac1=12;bd2=gty;cell=tsbr;id=5798;no=895;BX1=adr;cell2=156;txn=765;no2=khu"
}

You could try

    aggregate {
        task_id => "%{[task_id]}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "task_id"
        timeout => 5
        code => '
            map["message"] ||= ""
            if map["message"] == ""
                map["message"] = event.get("message")
            else
                map["message"] += ";" + event.get("message")
            end
            event.cancel
        '
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.