Sending a all source document into one dest field

I have the following documents in a index:

{
"demand_id": 1,
"date": "2020-08-20 12:00:00"
},
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"date": "2020-08-20 12:05:00",
"topic": "ready"
},
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"date": "2020-08-20 12:10:00",
"topic": "started"
},
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"result": "abandoned",
"date": "2020-08-20 12:15:00",
"topic": "finished"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"date": "2020-08-20 14:13:00",
"topic": "ready"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"date": "2020-08-20 14:14:00",
"topic": "started"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"result": "approved",
"date": "2020-08-20 14:16:00",
"topic": "finished"
}

And then I want to merge them in a constantly update to be like:

{
    "demand_id": 1,
    "date": "2020-08-20 12:00:00"
    "logs": [
        {
            "action_id": 1,
            "demand_id": 1,
            "client": "renato",
            "ready": "2020-08-20 12:05:00",
            "started": "2020-08-20 12:10:00",
            "result": "abandoned",
            "finished": "2020-08-20 12:15:00"
        },
        {
            "action_id": 2,
            "demand_id": 1,
            "client": "matheus",
            "ready": "2020-08-20 14:13:00",
            "started": "2020-08-20 14:14:00",
            "result": "approved",
            "finished": "2020-08-20 14:16:00"
        }
    ]
}

I tried the fallowing code, but it's not working. It's keeping just the last record of action_id:

 input {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "teste"
    schedule => "*/3 * * * * *"
  }
}

filter {
  mutate {
    add_field => {"%{topic}" => "%{date}"}
    remove_field => ["@version", "@timestamp", "topic", "date"]
  }
  aggregate {
    task_id => "%{action_id}"
    code => "
            map['tags'] ||= ['aggregated']
         "
    push_previous_map_as_event => true
    timeout => 5
  }
} 

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "teste_novo"
    document_id => "%{demand_id}"
    action => "update"
    doc_as_upsert => true
  }
}

When using aggregate to push the map on a timeout you need to build the entire event you want in the map.

This code will work, but you should add error checking to it, and dropping the first element of the array assumes the action_id values start at 1 and are consecutive. A ruby filter with more general code to delete nil items from the array may be more appropriate.

    aggregate {
        task_id => "%{demand_id}"
        code => '
            map["logs"] ||= []
            action = event.get("action_id")
            if action
                map["logs"][action] ||= {}
                map["logs"][action]["action_id"] = action
                map["logs"][action]["client"] ||= event.get("client")
                map["logs"][action]["demand_id"] ||= event.get("demand_id")
                case event.get("topic")
                when "ready"
                    map["logs"][action]["ready"] = event.get("date")
                when "started"
                    map["logs"][action]["started"] = event.get("date")
                when "finished"
                    map["logs"][action]["finished"] = event.get("date")
                    map["logs"][action]["result"] = event.get("result")
                end
            else
                map["date"] = event.get("date")
            end

            map["tags"] ||= ["aggregated"]
            event.cancel
        '
        timeout_task_id_field => "demand_id"
        push_previous_map_as_event => true
        timeout => 5
    }
    ruby { code => 'event.set("logs", event.get("logs").drop(1))' }

Thanks man! That worked like a charm!

Based on the time of the vents, some of them can take a long time from each other. Should I set this interval in timeout (inside the aggregate)?

The timeout has to be longer than the longest interval between first and last events.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.