Jdbc one to many relationship to logstash

Hi Guys, I'm little bit confused about how to use the aggregate filter, I've followed the documentation and read some articles,

I used a sample database in MySQL (Sakila Database)

and created a view in mysql sakila database


VIEW full_movie_details AS
SELECT
f.film_id AS film_id,
f.title AS title,
f.rating AS rating,
l.name AS language,
c.name AS category,
a.first_name AS first_name,
a.last_name AS last_name,
f.release_year AS release_year,
f.description AS description,
f.length AS length
FROM
(((((film f
JOIN film_category fc)
JOIN category c)
JOIN language l)
JOIN film_actor fa)
JOIN actor a)
WHERE
((f.film_id = fc.film_id)
AND (fc.category_id = c.category_id)
AND (f.film_id = fa.film_id)
AND (fa.actor_id = a.actor_id)
AND (f.language_id = l.language_id))
ORDER BY f.film_id


this is my logstash configuration

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql:/<ipaddress>:3306/sakila"
        jdbc_user => "logstash"
        jdbc_password => "Generationx-1"
        jdbc_driver_library => "/opt/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"

        #Our query to fetch from database
        #statement => "SELECT * FROM full_movie_details"
        statement => "SELECT film_id,title, rating, language, category,first_name, last_name , release_year, description, length FROM full_movie_details"
        lowercase_column_names => true

        last_run_metadata_path => "/opt/logstash-7.6.2/.logstash_jdbc_last_run"

        tracking_column => film_id
        tracking_column_type => "numeric"
        use_column_value => true

        }

filter {
    aggregate {
        task_id => "%{title}"
        code => "
            map['film_id'] = event.get('film_id')
            map['title'] ||= event.get('title')
            map['actors'] ||= []
            map['actors'] << {'first_name' => event.get('first_name'),'last_name' => event.get('last_name') }
            event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 20
    }

}

output {
   elasticsearch {
      hosts => "https://instance:9200"
      user => "elastic"
      password => "xxxxxxxx"
      index => "sakila-full_movie_details"
      action => "update"
      document_id => "%{film_id}"
      document_type => "_doc"
      doc_as_upsert => true

      codec => "json"
   }

   stdout { codec => json_lines }

}

And when I query or search a specific id in this vew , like when i search film title, it will return 6 actors, but when i ingest it to elasticsearch using logstash, my nested field is returning only 3 actors

SELECT * FROM sakila.full_movie_details where title = 'AMADEUS HOLY';



document in elasticsearch

{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "sakila-full_movie_details",
"_type" : "_doc",
"_id" : "19",
"_version" : 2,
"_score" : 1.0,
"_source" : {
"actors" : [
{
"last_name" : "LOLLOBRIGIDA",
"first_name" : "JOHNNY"
},
{
"last_name" : "BOLGER",
"first_name" : "VAL"
},
{
"last_name" : "PITT",
"first_name" : "JAMES"
}
],
"film_id" : 19,
"@timestamp" : "2020-05-17T08:14:46.868Z",
"@version" : "1",
"title" : "AMADEUS HOLY"
}
}
]
}
}


aggregation is working as expected, but, incomplete data.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.