How to merge documents from logstash output plugin to elasticsearch index while indexing

I have an elasticsearch index with below structure and sample data like below

{
studentId:12345,
studentName:"abc",
age:10,
tests:[
 {
  testId:100,
  score:70
 },
{
  testId:101,
  score:60
 }
 ]
}

and then I have a logstash instance running pipeline every 15 minutes fetching student records from mysql having updated rows since the last run of logstash job, based on the updated_time field which has definition timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

Following is the Tables Structure

--Student                                     --StudentTest
id, name, age, updated_time                   id, student_id, test_id, score, updated_time

and below is my logstash pipeline

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost/mydb"
    jdbc_user => ""
    jdbc_password => ""
    schedule => "*/15 * * * *"
    tracking_column_type => "timestamp"
    statement => "Select s.student_id, s.student_name, st.test_id, st.test_score
        from student s
        left join student_test st on s.id = st.student_id
        and (s.updated_time > TIMESTAMP(current_timestamp()-INTERVAL 15 MINUTE) or
        st.updated_time > TIMESTAMP(current_timestamp()-INTERVAL 15 MINUTE) );"
  }
}

filter {  
  aggregate {
       task_id => "%{student_id}"
       code => "
         map['studentId'] ||= event.get('student_id')
         map['studentName'] ||= event.get('student_name')
         map['tests'] ||= []

         if (event.get('test_id') != nil)          
          map['tests'] << { 
           'id' => event.get('test_id'),
           'score' => event.get('test_score')
          }          
         end

         event.cancel()
       "
       push_previous_map_as_event => true
       timeout => 5
     }
}

output {
  elasticsearch {
    document_id => "%{student_id}"
    document_type => "_doc"
    index => "students"
  }
  stdout{
  codec => rubydebug
  }
}

Having the data in elasticsearch of student 12345 mentioned initially, and a new record in inserted in table StudentTest for student 12345 with test_id as 102 and score 80, the logstash pipeline runs after 15 mins and fetch only this new record bcs if timestamp and this will override the already existing document in elasticsearch index with tests 100 and 101 with only test 102, how can i merge the existing array in ES index with newly inserted record of test_id 102 to make sure my final document eventually have something like below, where the result would be merging of the existing document and its test array with the new elements of the array coming from sql data and aggregate filter

{
studentId:12345,
studentName:"abc",
age:10,
tests:[
 {
  testId:100,
  score:70
 },
{
  testId:101,
  score:60
},
{
  testId:102,
  score:80
}
 ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.