I am trying to aggregate some data from my db by Logstash and send them to Elasticsearch.
My data looks like this:
The table consists of 4 columns. each question_id may have one or more answer_id and each answer_id have one doctor_id and each doctor_id may have one or more speciality_id.
I want to aggregate data by question_id in the following way
You want the question field to contain an array of answers, however to get there you need to make it a hash, which contains answer_id as a key. Once the structure is complete you convert the hash back to an array by throwing away those keys.
csv { columns => [ "question_id", "answer_id", "doctor_id", "speciality_id" ] }
aggregate {
task_id => "%{question_id}"
push_map_as_event_on_timeout => true
timeout_task_id_field => "question_id"
timeout => 5
code => '
answer_id = event.get("answer_id")
doctor_id = event.get("doctor_id")
speciality_id = event.get("speciality_id")
map["answers"] ||= {}
if ! map["answers"][answer_id]
map["answers"][answer_id] = { "answer_id" => answer_id,
"doctor_id" => doctor_id,
"doctor_specialities" => [ {"specialty_id" => speciality_id } ] }
else
# Assume it is the same doctor
map["answers"][answer_id]["doctor_specialities"] << {"specialty_id": speciality_id }
end
event.cancel
'
}
ruby {
code => '
# This converts a hash { "key" => "value" } to [ "key", "value" ]
answers = event.get("answers").to_a
# Next we throw away the keys since those are also in the value hash
newAnswers = []
answers.each { |x|
newAnswers << x[1]
}
event.set("answers", newAnswers)
'
}
event.cancel deletes the source data. When the aggregate filter times out the map is flushed to the pipeline as an event which goes through the ruby filter.
Note that this is just an outline of an approach, you need to add error checking.
That sounds like the timeout is not occurring. Perhaps the pipeline has stopped, so the timeout does not occur. You could try adding a file input that reads a file that contains no data, that will keep the pipeline running even if the jdbc input has finished.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.