I have an elasticsearch index with below structure and sample data like below
{
studentId:12345,
studentName:"abc",
age:10,
tests:[
{
testId:100,
score:70
},
{
testId:101,
score:60
}
]
}
and then I have a logstash instance running pipeline every 15 minutes fetching student records from mysql having updated rows since the last run of logstash job, based on the updated_time
field which has definition timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
Following is the Tables Structure
--Student --StudentTest
id, name, age, updated_time id, student_id, test_id, score, updated_time
and below is my logstash pipeline
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost/mydb"
jdbc_user => ""
jdbc_password => ""
schedule => "*/15 * * * *"
tracking_column_type => "timestamp"
statement => "Select s.student_id, s.student_name, st.test_id, st.test_score
from student s
left join student_test st on s.id = st.student_id
and (s.updated_time > TIMESTAMP(current_timestamp()-INTERVAL 15 MINUTE) or
st.updated_time > TIMESTAMP(current_timestamp()-INTERVAL 15 MINUTE) );"
}
}
filter {
aggregate {
task_id => "%{student_id}"
code => "
map['studentId'] ||= event.get('student_id')
map['studentName'] ||= event.get('student_name')
map['tests'] ||= []
if (event.get('test_id') != nil)
map['tests'] << {
'id' => event.get('test_id'),
'score' => event.get('test_score')
}
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
}
output {
elasticsearch {
document_id => "%{student_id}"
document_type => "_doc"
index => "students"
}
stdout{
codec => rubydebug
}
}
Having the data in elasticsearch of student 12345 mentioned initially, and a new record in inserted in table StudentTest for student 12345 with test_id as 102 and score 80, the logstash pipeline runs after 15 mins and fetch only this new record bcs if timestamp and this will override the already existing document in elasticsearch index with tests 100 and 101 with only test 102, how can i merge the existing array in ES index with newly inserted record of test_id 102 to make sure my final document eventually have something like below, where the result would be merging of the existing document and its test array with the new elements of the array coming from sql data and aggregate filter
{
studentId:12345,
studentName:"abc",
age:10,
tests:[
{
testId:100,
score:70
},
{
testId:101,
score:60
},
{
testId:102,
score:80
}
]
}