Hello,
I am struggling getting done some filtering to get in Elastic the information in the format I want. First, I have a MySQL table that I process in the input (no problem in that part). I want some of the columns to be outputed under a common nested field. E.g.:
author: Authorname
performer: Performername
(these are two columns in MySQL)
And I want this to be in Elasticsearch:
"artists": [{
"artist_name": "Authorname",
"artist_role": "AUTHOR"
},
{
"artist_name": "Performername",
"artist_role": "PERFORMER"
}]
Similarly, I have two columns with "original_title" and "alternative_title" which I want to be processed as for the case above, resulting into:
"titles": [{
"title_name": "The original title name",
"title_type": "ORIGINAL"
},
{
"title_name": "An alternative title name",
"title_type": "ALTERNATIVE"
}]
I have been playing a lot with aggregatable and mutate but I cannot get a solution. I sometimes partially get one of the two cases done, but the other one does not appear in Elastic, and the other fields that are not going to be nested are not processed at all. I guess I am not understanding how aggregatable work in terms of timeout and data processing. Take into account that I will have several rows, and want that format to be applied in each row translated to each corresponding Elasticsearch element.
I leave here my current filter status so that someone can bring light into it:
filter {
aggregate {
task_id => "%{id}" ### I do not know exactly what id to use here
code => "
map['artists'] ||= []
map['artists'] << {'artist_name' => event.get('author'), 'ipi_role' => 'AUTHOR'}
map['artists'] << {'artist_name' => event.get('performer'), 'ipi_role' => 'PERFORMER'}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
aggregate {
task_id => "%{id}" ### I do not know exactly what id to use here
code => "
map['titles'] ||= []
map['titles'] << {'title_name' => event.get('original_title'), 'title_type' => 'ORIGINAL'}
map['titles'] << {'title_name' => event.get('alternative_title'), 'title_type' => 'ALTERNATIVE'}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
copy => { "id" => "[@metadata][_id]"}
rename => {
"producer" => "[producer_name][name]"
}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
UPDATE: If I change the Ids of the two aggregate:
task_id => "%{author}"
...
task_id => "%{original_title}"
I get this in Elastic, which is ok for the part that is being processed, but it is missing the other nested fields, the rest of the regular fields, and the id is also messed up.
{
"_index": "test_logstash",
"_type": "_doc",
"_id": "%{[@metadata][_id]}",
"_version": 1,
"_score": 0,
"_source": {
"@timestamp": "2022-05-16T12:58:00.090646200Z",
"artists": [
{
"artist_name": "Authorname",
"artist_role": "AUTHOR"
},
{
"artist_name": "Performername",
"artist_role": "PERFORMER"
}
]
}
}
In summary, the key point is that I do not want to group rows with same, e.g., title, but just row by row re-arrange some columns into this nested-like structure.
UPDATE 2:
I have slightly moved forward, being able to get both nested fields processed by using this code:
aggregate {
task_id => "%{id}"
code => "
map['artists'] ||= []
map['artists'] << {'artist_name' => event.get('author'), 'artist_role' => 'AUTHOR'}
map['artists'] << {'ipi_name' => event.get('performer'), 'artist_role' => 'PERFORMER'}
map['titles'] ||= []
map['titles'] << {'title_name' => event.get('original_title'), 'title_type' => 'ORIGINAL'}
map['titles'] << {'title_name' => event.get('alternative_title'), 'title_type' => 'ALTERNATIVE'}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
copy => { "id" => "[@metadata][_id]"}
rename => {
"series_name" => "[series_name][name]"
}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
However, still the rest of the fields (columns) are not processed, displaying a situation similar to the elastic output displayed above.
Thank you very much in advance.
Best regards,
Alejandro.-