I have the following documents in a index:
{
"demand_id": 1,
"date": "2020-08-20 12:00:00"
},
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"date": "2020-08-20 12:05:00",
"topic": "ready"
},
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"date": "2020-08-20 12:10:00",
"topic": "started"
},
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"result": "abandoned",
"date": "2020-08-20 12:15:00",
"topic": "finished"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"date": "2020-08-20 14:13:00",
"topic": "ready"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"date": "2020-08-20 14:14:00",
"topic": "started"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"result": "approved",
"date": "2020-08-20 14:16:00",
"topic": "finished"
}
And then I want to merge them in a constantly update to be like:
{
"demand_id": 1,
"date": "2020-08-20 12:00:00"
"logs": [
{
"action_id": 1,
"demand_id": 1,
"client": "renato",
"ready": "2020-08-20 12:05:00",
"started": "2020-08-20 12:10:00",
"result": "abandoned",
"finished": "2020-08-20 12:15:00"
},
{
"action_id": 2,
"demand_id": 1,
"client": "matheus",
"ready": "2020-08-20 14:13:00",
"started": "2020-08-20 14:14:00",
"result": "approved",
"finished": "2020-08-20 14:16:00"
}
]
}
I tried the fallowing code, but it's not working. It's keeping just the last record of action_id:
input {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "teste"
schedule => "*/3 * * * * *"
}
}
filter {
mutate {
add_field => {"%{topic}" => "%{date}"}
remove_field => ["@version", "@timestamp", "topic", "date"]
}
aggregate {
task_id => "%{action_id}"
code => "
map['tags'] ||= ['aggregated']
"
push_previous_map_as_event => true
timeout => 5
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "teste_novo"
document_id => "%{demand_id}"
action => "update"
doc_as_upsert => true
}
}