I have a logstash pipeline that
- fetches data from mysql using jdbc input connecter
- aggregates data for users based on user id
- pushes aggregated data to elasticsearch cluster
It fetches large amount of data (e.g 2 million rows) from mysql server and uses cursor fetch with "jdbc_fetch_size" of 100000 so that it does not load all the rows at once (not page size, limit + offset but fetch_size) to avoid out of memory exception.
Below is my configuration:
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://host/dbname?useCursorFetch=true"
jdbc_fetch_size=>"100000"
jdbc_user => ""
jdbc_password => ""
statement => "Select *
from users_list
left join user_posts on users_list.id = user_posts.user_id
left join user_friends on users_list.id = user_friends.user_id
order by users_list.id;"
}
}
filter {
aggregate {
task_id => "%{id}" #id of user
code => "
map['user_id'] ||= event.get('id')
map['name'] ||= event.get('name')
map['email'] ||= event.get('email')
if (event.get('post_id') != nil)
map['posts'] << {
'id' => event.get('post_id'),
'title' => event.get('post_title'),
'description' => event.get('post_description')}
end
if (event.get('friend_id') != nil)
map['friends'] << {
'id' => event.get('friend_id'),
'name' => event.get('friend_name')}
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
output {
elasticsearch {
document_id => "%{id}"
index => ""
hosts => [""]
user => ""
password => ""
}
}
Below is my elastic search index mapping:
{
"mappings": {
"properties": {
"user_id": {
"type": "long"
},
"posts": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"title": {
"type": "text"
},
"description": {
"type": "text"
}
}
},
"friends": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text"
}
}
},
"email": {
"type": "text"
},
"name": {
"type": "text"
}
}
}
}
Everything seems to be fine but i have a concern over the aggregation, my question is:
- How does logstash handles the pagination with aggregation, i am not sure over the case, when logstash fetches first batch of 100000 rows from mysql, there is a possibility that the data (rows) of that user can get divided somehow where first batch fetches a first half (related rows for that user) and the next fetch , gets the other half (for that user) later, on second round trip, isn't there a possibility that while the second round trip, the aggregated map (upto that point of time) for that user gets emptied by timeout and the previously aggregated data gets flushed by logstash. I know all should be good if we dont use pagination bcs then, there wont be any division of data horizontally, but for this amount of data we must use pagination
- if thats the case, how can we make sure that no data gets lost/overwritten by second batch while we are performing the aggregation using pagination ?
- another question is, does logstash empties the first batch of events before it goes for the second round trip for the next batch or does it keep them in memory too?