Nested aggregation

Hello guys,

I am trying to aggregate some data from my db by Logstash and send them to Elasticsearch.

My data looks like this:
Screen Shot 2021-10-30 at 1.20.16 PM

The table consists of 4 columns. each question_id may have one or more answer_id and each answer_id have one doctor_id and each doctor_id may have one or more speciality_id.

I want to aggregate data by question_id in the following way

{
	"question_id": 1327562,
	"answers": [
	    {
	      "answer_id": 804320,
	      "doctor_id": 97426,
	      "doctor_specialities":
	      [
	        {"specialty_id": 68}, {"specialty_id": 99}
	      ]
	    }
	]
}

Thank you!

You want the question field to contain an array of answers, however to get there you need to make it a hash, which contains answer_id as a key. Once the structure is complete you convert the hash back to an array by throwing away those keys.

    csv { columns => [ "question_id", "answer_id", "doctor_id", "speciality_id" ] }
    aggregate {
        task_id => "%{question_id}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "question_id"
        timeout => 5
        code => '
            answer_id = event.get("answer_id")
            doctor_id = event.get("doctor_id")
            speciality_id = event.get("speciality_id")

            map["answers"] ||= {}

            if ! map["answers"][answer_id]
                map["answers"][answer_id] = { "answer_id" => answer_id,
                                                "doctor_id" => doctor_id,
                                                "doctor_specialities" => [ {"specialty_id" => speciality_id } ] }
            else
                # Assume it is the same doctor
                map["answers"][answer_id]["doctor_specialities"] << {"specialty_id": speciality_id }
            end
            event.cancel
        '
    }
    ruby {
        code => '
        # This converts a hash { "key" => "value" } to [ "key", "value" ]
        answers = event.get("answers").to_a
        # Next we throw away the keys since those are also in the value hash
        newAnswers = []
        answers.each { |x|
            newAnswers << x[1]
        }
        event.set("answers", newAnswers)
        '
    }

event.cancel deletes the source data. When the aggregate filter times out the map is flushed to the pipeline as an event which goes through the ruby filter.

Note that this is just an outline of an approach, you need to add error checking.

Hi Badger,

Thank you for your reply

I tried your solution and got no output .

This is my code.

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.25.jar"
    jdbc_driver_class => "${JDBC_DRIVER_CLASS}"
    jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
    jdbc_user => "${JDBC_USER}"
    jdbc_password => "${JDBC_PASSWORD}"
    statement_filepath => "/usr/share/logstash/config/queries/incremental.sql"
  }
}

filter {
    aggregate {
        task_id => "%{question_id}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "question_id"
        timeout => 5
        code => '
            answer_id = event.get("answer_id")
            doctor_id = event.get("doctor_id")
            speciality_id = event.get("speciality_id")

            map["answers"] ||= {}

            if ! map["answers"][answer_id]
                map["answers"][answer_id] = { "answer_id" => answer_id,
                                                "doctor_id" => doctor_id,
                                                "doctor_specialities" => [ {"specialty_id" => speciality_id } ] }
            else
                # Assume it is the same doctor
                map["answers"][answer_id]["doctor_specialities"] << {"specialty_id": speciality_id }
            end
            event.cancel
        '
    }
    ruby {
        code => '
        # This converts a hash { "key" => "value" } to [ "key", "value" ]
        answers = event.get("answers").to_a
        # Next we throw away the keys since those are also in the value hash
        newAnswers = []
        answers.each { |x|
            newAnswers << x[1]
        }
        event.set("answers", newAnswers)
        '
    }

}
output {
  stdout{
  codec => rubydebug
  }
}

and when I removed (event.cancel) i got this output

logstash_1       | {
logstash_1       |          "@version" => "1",
logstash_1       |       "question_id" => 8548,
logstash_1       |           "answers" => [],
logstash_1       |         "answer_id" => 236955,
logstash_1       |         "doctor_id" => 115132,
logstash_1       |     "speciality_id" => 24,
logstash_1       |        "@timestamp" => 2021-11-02T15:16:44.896Z
logstash_1       | }
logstash_1       | {
logstash_1       |          "@version" => "1",
logstash_1       |       "question_id" => 8548,
logstash_1       |           "answers" => [],
logstash_1       |         "answer_id" => 237095,
logstash_1       |         "doctor_id" => 95751,
logstash_1       |     "speciality_id" => 24,
logstash_1       |        "@timestamp" => 2021-11-02T15:16:44.897Z
logstash_1       | }
logstash_1       | {
logstash_1       |          "@version" => "1",
logstash_1       |       "question_id" => 8548,
logstash_1       |           "answers" => [],
logstash_1       |         "answer_id" => 237998,
logstash_1       |         "doctor_id" => 95751,
logstash_1       |     "speciality_id" => 24,
logstash_1       |        "@timestamp" => 2021-11-02T15:16:44.898Z
logstash_1       | }
logstash_1       | {
logstash_1       |          "@version" => "1",
logstash_1       |       "question_id" => 8548,
logstash_1       |           "answers" => [],
logstash_1       |         "answer_id" => 241824,
logstash_1       |         "doctor_id" => 96780,
logstash_1       |     "speciality_id" => 24,
logstash_1       |        "@timestamp" => 2021-11-02T15:16:44.898Z
logstash_1       | }
l

That sounds like the timeout is not occurring. Perhaps the pipeline has stopped, so the timeout does not occur. You could try adding a file input that reads a file that contains no data, that will keep the pipeline running even if the jdbc input has finished.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.