Logstash Aggregate - pushing same data multiple times in nested array json

Hi
I have been trying to index data from Postgres to Elasticsearch using logstash.

Scenario:
There is a parent medicine which can have multiple child/variant medicines.
I need to index data for each parent medicine which can contain multiple variant medicines in the form of an array.

I am able to successfully do this but in some cases variants array indexes same variant medicine twice in the array whenever I am updating the parent medicine or variant medicine.

Also, I am using Postgres trigger to update parent_medicine update_at field whenever variant or parent medicine updates as updated_at field being tracked by Logstash for updates.

Logstash configuration:

input {
  jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/dbdata"
jdbc_validate_connection => true
jdbc_user => "postgres"
jdbc_password=>"*****"
tracking_column => "latest_updated_at"
use_column_value => true
tracking_column_type => "timestamp"
schedule => "*/2 * * * * *"
statement => "select *,pmm.updated_at as latest_updated_at,pmm.parent_id as pid,pmm.created_at as parent_created_at,pmm.unit_type as parent_unit_type,mm.unit_type as child_unit_type from parent_medicines_master pmm left join medicines_master mm on pmm.parent_id=mm.parent_id where pmm.updated_at > :sql_last_value order by pid"
  }
}

filter {
mutate {
    copy => { "parent_id" => "[@metadata][_id]"}
    remove_field => ["@version"]
}
aggregate {
    task_id => "%{parent_id}"
    code => "
        map['parent_id'] ||= event.get('pid')
        map['parent_name'] ||= event.get('parent_name')
        map['type'] ||= event.get('type')
        map['form_type'] ||= event.get('form_type')
        map['unit_type'] ||= event.get('parent_unit_type')
        map['generic_salt_id'] ||= event.get('generic_salt_id')
        map['medicine_ref'] ||= event.get('medicine_ref')
        map['variant_ref_id'] ||= event.get('variant_ref_id')
        map['variant_mrp'] ||= event.get('variant_mrp')
        map['variant_pack_size'] ||= event.get('variant_pack_size')
      map['remarks'] ||= event.get('remarks')
      map['parent_medicine_status'] ||= event.get('parent_medicine_status')
      map['created_at'] ||= event.get('parent_created_at')
      map['updated_at'] ||= event.get('latest_updated_at')
      map['manf_id'] ||= event.get('manf_id')
	  map['images'] ||= event.get('images')
	  map['parent_unapproved_timestamp'] ||= event.get('parent_unapproved_timestamp')
	  map['display_name'] ||= event.get('display_name')
      map['child_medicines'] ||= []
	  if event.get('medicine_id')!=nil
      then
         map['child_medicines'] << {
              'parent_id' => event.get('pid'), 'medicine_id' => event.get('medicine_id'), 'medicine_name' => event.get('medicine_name'), 'uip' => event.get('uip'), 'quantity' => event.get('quantity'), 'mrp' => event.get('mrp'), 'reference_url' => event.get('reference_url'), 'medicine_status' => event.get('medicine_status'), 'unapproved_timestamp' => event.get('unapproved_timestamp')
          }
      end
      event.cancel()
    "
    push_previous_map_as_event => true
	timeout => 1
}
}

output {
stdout { codec => json_lines }
elasticsearch {
	hosts => ["127.0.0.1:9200"]
    user => 'elastic'
    password => '*****'
    index => "main_medicines_master"
    document_type => "_doc"
    document_id => "%{parent_id}"
}
}

Wrong Output:

 {
                "parent_medicine_status": 1,
                "parent_unapproved_timestamp": "2019-11-27T07:20:31.622Z",
                "form_type": "MR",
                "type": "drug",
                "display_name": "Test",
                "unit_type": "strip",
                "variant_ref_id": "",
                "variant_mrp": 0,
                "medicine_ref": null,
                "created_at": "2019-11-15T15:10:20.996Z",
                "updated_at": "2019-11-27T12:06:13.867Z",
                "manf_id": "6",
                "images": " ",
                "@timestamp": "2019-11-30T12:36:49.437Z",
                "parent_id": "MTPM-1060",
                "child_medicines": [
                    {
                        "medicine_status": 1,
                        "parent_id": "MTPM-1060",
                        "medicine_name": "Test",
                        "uip": 11,
                        "quantity": 11,
                        "mrp": 11,
                        "medicine_id": "MTMM-1099",
                        "reference_url": "",
                        "unapproved_timestamp": "2019-11-27T12:01:16.233Z"
                    },
                    {
                        "medicine_status": 2,
                        "parent_id": "MTPM-1060",
                        "medicine_name": "Test",
                        "uip": 10,
                        "quantity": 1,
                        "mrp": 140,
                        "medicine_id": "MTMM-1061",
                        "reference_url": "",
                        "unapproved_timestamp": "2019-11-27T12:01:16.233Z"
                    },
                {
                        "medicine_status": 1,
                        "parent_id": "MTPM-1060",
                        "medicine_name": "Test",
                        "uip": 11,
                        "quantity": 11,
                        "mrp": 11,
                        "medicine_id": "MTMM-1099",
                        "reference_url": "",
                        "unapproved_timestamp": "2019-11-27T12:01:16.233Z"
                    },
                    {
                        "medicine_status": 2,
                        "parent_id": "MTPM-1060",
                        "medicine_name": "Test",
                        "uip": 10,
                        "quantity": 1,
                        "mrp": 140,
                        "medicine_id": "MTMM-1061",
                        "reference_url": "",
                        "unapproved_timestamp": "2019-11-27T12:01:16.233Z"
                    }
                ],
                "variant_pack_size": 0,
                "parent_name": "Test",
                "generic_salt_id": "GN-48481",
                "remarks": null,
                "@version": "1",
                "unapprovedSince": 77
            }

desired output:

        {
            "parent_medicine_status": 1,
            "parent_unapproved_timestamp": "2019-11-27T07:20:31.622Z",
            "form_type": "MR",
            "type": "drug",
            "display_name": "Test",
            "unit_type": "strip",
            "variant_ref_id": "",
            "variant_mrp": 0,
            "medicine_ref": null,
            "created_at": "2019-11-15T15:10:20.996Z",
            "updated_at": "2019-11-27T12:06:13.867Z",
            "manf_id": "6",
            "images": " ",
            "@timestamp": "2019-11-30T12:36:49.437Z",
            "parent_id": "MTPM-1060",
            "child_medicines": [
                {
                    "medicine_status": 1,
                    "parent_id": "MTPM-1060",
                    "medicine_name": "Test",
                    "uip": 11,
                    "quantity": 11,
                    "mrp": 11,
                    "medicine_id": "MTMM-1099",
                    "reference_url": "",
                    "unapproved_timestamp": "2019-11-27T12:01:16.233Z"
                },
                {
                    "medicine_status": 2,
                    "parent_id": "MTPM-1060",
                    "medicine_name": "Test",
                    "uip": 10,
                    "quantity": 1,
                    "mrp": 140,
                    "medicine_id": "MTMM-1061",
                    "reference_url": "",
                    "unapproved_timestamp": "2019-11-27T12:01:16.233Z"
                }
            ],
            "variant_pack_size": 0,
            "parent_name": "Test",
            "generic_salt_id": "GN-48481",
            "remarks": null,
            "@version": "1",
            "unapprovedSince": 77
        }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.