I am repeatedly fetching rows from a database. I insert them into elasticsearch using the unique key as the document_id
. For any fields not on the current document I want to add any missing columns to the exiting documents. For fields that do exist on the current document (including hashes) I want to overwrite them. How can I do this?
Here is my filter and output part:
filter {
if "my_index" in [tags] {
aggregate {
task_id => "%{componentinstance.id}"
code => "
map['id'] = event.get('componentinstance.id')
map['name'] = 'Test Data'
map['@timestamp'] = event.get('@timestamp')
map['attributes'] ||= {}
map['attributes'][event.get('attributename')] = event.get('attributevalue')
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
add_field => { "custom_index_name" => "my_index" }
}
} else {
mutate {
add_field => { "custom_index_name" => "%{type}" }
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "my_index"
document_id => "%{id}"
document_type => "%{type}"
action => "update"
doc_as_upsert => true
}
}
Output from stdout:
{
"tags" => [
[0] "_aggregatefinalflush"
],
"@timestamp" => 2023-04-24T09:56:47.000Z,
"attributes" => {
"Name" => "Jack",
"Age" => "27",
"City" => "Chennai"
},
"@version" => "1",
"name" => "Test Data",
"id" => 375283,
"custom_index_name" => "my_index"
}
If new attributes are getting added from SQL, the existing attributes remains same and it won't get removed.
Old Data:
"attributes" => {
"Name" => "Jack",
"Age" => "27",
"City" => "Chennai"
}
New Data:
"attributes" => {
"Name" => "Sunny",
"Age" => "24"
}
Current output:
"attributes" => {
"Name" => "Sunny",
"Age" => "24",
"City" => "Chennai" /*** It is not removed now, this was the issue ***/
}
Expected Output:
"attributes" => {
"Name" => "Sunny",
"Age" => "24"
}