Hi Guys, I'm little bit confused about how to use the aggregate filter, I've followed the documentation and read some articles,
I used a sample database in MySQL (Sakila Database)
and created a view in mysql sakila database
VIEW full_movie_details
AS
SELECT
f
.film_id
AS film_id
,
f
.title
AS title
,
f
.rating
AS rating
,
l
.name
AS language
,
c
.name
AS category
,
a
.first_name
AS first_name
,
a
.last_name
AS last_name
,
f
.release_year
AS release_year
,
f
.description
AS description
,
f
.length
AS length
FROM
(((((film
f
JOIN film_category
fc
)
JOIN category
c
)
JOIN language
l
)
JOIN film_actor
fa
)
JOIN actor
a
)
WHERE
((f
.film_id
= fc
.film_id
)
AND (fc
.category_id
= c
.category_id
)
AND (f
.film_id
= fa
.film_id
)
AND (fa
.actor_id
= a
.actor_id
)
AND (f
.language_id
= l
.language_id
))
ORDER BY f
.film_id
this is my logstash configuration
input {
jdbc {
jdbc_connection_string => "jdbc:mysql:/<ipaddress>:3306/sakila"
jdbc_user => "logstash"
jdbc_password => "Generationx-1"
jdbc_driver_library => "/opt/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#Our query to fetch from database
#statement => "SELECT * FROM full_movie_details"
statement => "SELECT film_id,title, rating, language, category,first_name, last_name , release_year, description, length FROM full_movie_details"
lowercase_column_names => true
last_run_metadata_path => "/opt/logstash-7.6.2/.logstash_jdbc_last_run"
tracking_column => film_id
tracking_column_type => "numeric"
use_column_value => true
}
filter {
aggregate {
task_id => "%{title}"
code => "
map['film_id'] = event.get('film_id')
map['title'] ||= event.get('title')
map['actors'] ||= []
map['actors'] << {'first_name' => event.get('first_name'),'last_name' => event.get('last_name') }
event.cancel()
"
push_previous_map_as_event => true
timeout => 20
}
}
output {
elasticsearch {
hosts => "https://instance:9200"
user => "elastic"
password => "xxxxxxxx"
index => "sakila-full_movie_details"
action => "update"
document_id => "%{film_id}"
document_type => "_doc"
doc_as_upsert => true
codec => "json"
}
stdout { codec => json_lines }
}
And when I query or search a specific id in this vew , like when i search film title, it will return 6 actors, but when i ingest it to elasticsearch using logstash, my nested field is returning only 3 actors
SELECT * FROM sakila.full_movie_details where title = 'AMADEUS HOLY';
document in elasticsearch
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "sakila-full_movie_details",
"_type" : "_doc",
"_id" : "19",
"_version" : 2,
"_score" : 1.0,
"_source" : {
"actors" : [
{
"last_name" : "LOLLOBRIGIDA",
"first_name" : "JOHNNY"
},
{
"last_name" : "BOLGER",
"first_name" : "VAL"
},
{
"last_name" : "PITT",
"first_name" : "JAMES"
}
],
"film_id" : 19,
"@timestamp" : "2020-05-17T08:14:46.868Z",
"@version" : "1",
"title" : "AMADEUS HOLY"
}
}
]
}
}
aggregation is working as expected, but, incomplete data.