Hi
I have been trying to index data from Postgres to Elasticsearch using logstash.
Scenario:
There is a parent medicine which can have multiple child/variant medicines.
I need to index data for each parent medicine which can contain multiple variant medicines in the form of an array.
I am able to successfully do this but in some cases variants array indexes same variant medicine twice in the array whenever I am updating the parent medicine or variant medicine.
Also, I am using Postgres trigger to update parent_medicine update_at field whenever variant or parent medicine updates as updated_at field being tracked by Logstash for updates.
Logstash configuration:
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/dbdata"
jdbc_validate_connection => true
jdbc_user => "postgres"
jdbc_password=>"*****"
tracking_column => "latest_updated_at"
use_column_value => true
tracking_column_type => "timestamp"
schedule => "*/2 * * * * *"
statement => "select *,pmm.updated_at as latest_updated_at,pmm.parent_id as pid,pmm.created_at as parent_created_at,pmm.unit_type as parent_unit_type,mm.unit_type as child_unit_type from parent_medicines_master pmm left join medicines_master mm on pmm.parent_id=mm.parent_id where pmm.updated_at > :sql_last_value order by pid"
}
}
filter {
mutate {
copy => { "parent_id" => "[@metadata][_id]"}
remove_field => ["@version"]
}
aggregate {
task_id => "%{parent_id}"
code => "
map['parent_id'] ||= event.get('pid')
map['parent_name'] ||= event.get('parent_name')
map['type'] ||= event.get('type')
map['form_type'] ||= event.get('form_type')
map['unit_type'] ||= event.get('parent_unit_type')
map['generic_salt_id'] ||= event.get('generic_salt_id')
map['medicine_ref'] ||= event.get('medicine_ref')
map['variant_ref_id'] ||= event.get('variant_ref_id')
map['variant_mrp'] ||= event.get('variant_mrp')
map['variant_pack_size'] ||= event.get('variant_pack_size')
map['remarks'] ||= event.get('remarks')
map['parent_medicine_status'] ||= event.get('parent_medicine_status')
map['created_at'] ||= event.get('parent_created_at')
map['updated_at'] ||= event.get('latest_updated_at')
map['manf_id'] ||= event.get('manf_id')
map['images'] ||= event.get('images')
map['parent_unapproved_timestamp'] ||= event.get('parent_unapproved_timestamp')
map['display_name'] ||= event.get('display_name')
map['child_medicines'] ||= []
if event.get('medicine_id')!=nil
then
map['child_medicines'] << {
'parent_id' => event.get('pid'), 'medicine_id' => event.get('medicine_id'), 'medicine_name' => event.get('medicine_name'), 'uip' => event.get('uip'), 'quantity' => event.get('quantity'), 'mrp' => event.get('mrp'), 'reference_url' => event.get('reference_url'), 'medicine_status' => event.get('medicine_status'), 'unapproved_timestamp' => event.get('unapproved_timestamp')
}
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 1
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => ["127.0.0.1:9200"]
user => 'elastic'
password => '*****'
index => "main_medicines_master"
document_type => "_doc"
document_id => "%{parent_id}"
}
}
Wrong Output:
{
"parent_medicine_status": 1,
"parent_unapproved_timestamp": "2019-11-27T07:20:31.622Z",
"form_type": "MR",
"type": "drug",
"display_name": "Test",
"unit_type": "strip",
"variant_ref_id": "",
"variant_mrp": 0,
"medicine_ref": null,
"created_at": "2019-11-15T15:10:20.996Z",
"updated_at": "2019-11-27T12:06:13.867Z",
"manf_id": "6",
"images": " ",
"@timestamp": "2019-11-30T12:36:49.437Z",
"parent_id": "MTPM-1060",
"child_medicines": [
{
"medicine_status": 1,
"parent_id": "MTPM-1060",
"medicine_name": "Test",
"uip": 11,
"quantity": 11,
"mrp": 11,
"medicine_id": "MTMM-1099",
"reference_url": "",
"unapproved_timestamp": "2019-11-27T12:01:16.233Z"
},
{
"medicine_status": 2,
"parent_id": "MTPM-1060",
"medicine_name": "Test",
"uip": 10,
"quantity": 1,
"mrp": 140,
"medicine_id": "MTMM-1061",
"reference_url": "",
"unapproved_timestamp": "2019-11-27T12:01:16.233Z"
},
{
"medicine_status": 1,
"parent_id": "MTPM-1060",
"medicine_name": "Test",
"uip": 11,
"quantity": 11,
"mrp": 11,
"medicine_id": "MTMM-1099",
"reference_url": "",
"unapproved_timestamp": "2019-11-27T12:01:16.233Z"
},
{
"medicine_status": 2,
"parent_id": "MTPM-1060",
"medicine_name": "Test",
"uip": 10,
"quantity": 1,
"mrp": 140,
"medicine_id": "MTMM-1061",
"reference_url": "",
"unapproved_timestamp": "2019-11-27T12:01:16.233Z"
}
],
"variant_pack_size": 0,
"parent_name": "Test",
"generic_salt_id": "GN-48481",
"remarks": null,
"@version": "1",
"unapprovedSince": 77
}
desired output:
{
"parent_medicine_status": 1,
"parent_unapproved_timestamp": "2019-11-27T07:20:31.622Z",
"form_type": "MR",
"type": "drug",
"display_name": "Test",
"unit_type": "strip",
"variant_ref_id": "",
"variant_mrp": 0,
"medicine_ref": null,
"created_at": "2019-11-15T15:10:20.996Z",
"updated_at": "2019-11-27T12:06:13.867Z",
"manf_id": "6",
"images": " ",
"@timestamp": "2019-11-30T12:36:49.437Z",
"parent_id": "MTPM-1060",
"child_medicines": [
{
"medicine_status": 1,
"parent_id": "MTPM-1060",
"medicine_name": "Test",
"uip": 11,
"quantity": 11,
"mrp": 11,
"medicine_id": "MTMM-1099",
"reference_url": "",
"unapproved_timestamp": "2019-11-27T12:01:16.233Z"
},
{
"medicine_status": 2,
"parent_id": "MTPM-1060",
"medicine_name": "Test",
"uip": 10,
"quantity": 1,
"mrp": 140,
"medicine_id": "MTMM-1061",
"reference_url": "",
"unapproved_timestamp": "2019-11-27T12:01:16.233Z"
}
],
"variant_pack_size": 0,
"parent_name": "Test",
"generic_salt_id": "GN-48481",
"remarks": null,
"@version": "1",
"unapprovedSince": 77
}