Hey, there.
I am trying to use LogStash to send data do ES and I have encountered a problem I could not handle yet:
Consider a table like where the important columns are:
proc_id | title | operations_id | products_id | keywords_name | keywords_id
Sample data:
This table, is generated by a query and converted to JSON documents with LogStash + JDBC. The problem is:
When I run logstash, it does not aggregate all the "operations_id" and "products_id". It looks like it is skipping some rows. So, considering the table above (in google docs), sometimes the document is stored in ES like this:
{
"id": 510,
"products_id": [10, 20, 30, 40],
etc..
}
If I drop the index and run it again, it looks like:
{
"id": 510,
"products_id": [50],
etc..
}
But the correct would be:
{
"id": 510,
"products_id": [10, 20, 30, 40, 50],
etc..
}
My logstash.conf file looks like this:
filter {
aggregate {
task_id => "%{proc_id}" # this is the ID of the procedure, the main unique ID.
code => "
map['id'] = event.get('proc_id')
map['title'] = event.get('title')
map['description_raw'] = event.get('description_raw')
map['scripts'] ||= []
map['scripts'] << {
'title_raw' => event.get('scripts_title_raw'),
'content_raw' => event.get('scripts_content_raw')
}
map['categories'] ||= []
map['categories'] << event.get('categories_name')
map['keywords'] ||= []
map['keywords'] << event.get('keywords_name')
map['operations'] ||= []
map['operations'] << event.get('operations_id')
map['products'] ||= []
map['products'] << event.get('products_id')
"
push_previous_map_as_event => true
timeout => 5000
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
# Remove fields that are not needed in ElasticSearch:
else {
mutate {
remove_field => ["proc_id", "categories_name", "categories_id", "keywords_id", "keywords_name", "scripts_id", "operations_id", "products_id"]
}
}
}
Any help is very appreciated. I do not know how to effectively aggregate all the "products_id" and "operations_id", "keyword_names" in one single document into ES.
Thank you.