Hello,
I am using the following configuration to sync some data into my Elasticsearch indeces:
logstashPipeline:
logstash.conf: |
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "<connection string>"
jdbc_user => "${username}"
jdbc_password => "${password}"
sequel_opts => {
jdbc_properties => {
"encryptionAlgorithm" => "2"
"securityMechanism" => "7"
}
}
schedule => "30 22 * * * Etc/GMT"
statement => "SELECT plant_name, plant_number from ELASTIC_POC_PLANT"
tags => "plant"
}
jdbc {
jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "<connection string>"
jdbc_user => "${username}"
jdbc_password => "${password}"
sequel_opts => {
jdbc_properties => {
"encryptionAlgorithm" => "2"
"securityMechanism" => "7"
}
}
schedule => "45 22 * * * Etc/GMT"
statement => "SELECT id, index, number, name, city from ELASTIC_POC_SUPPLIER"
tags => "supplier"
}
jdbc {
jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "<connection string>"
jdbc_user => "${username}"
jdbc_password => "${password}"
sequel_opts => {
jdbc_properties => {
"encryptionAlgorithm" => "2"
"securityMechanism" => "7"
}
}
schedule => "15 23 * * * Etc/GMT"
statement => "SELECT part_id, p_part_number_base, part_number, p_part_number_es1, part_number_es2, partname, color_designation from ELASTIC_POC_PART"
tags => "part"
}
}
output {
if "supplier" in [tags] {
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "supplier"
document_id => "%{id}"
}
}
if "plant" in [tags] {
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "plant"
document_id => "%{plant_number}"
}
}
if "part" in [tags] {
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "part"
document_id => "%{part_id}"
}
}
}
complaint.conf: |
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "<connection string>"
jdbc_user => "${username}"
jdbc_password => "${password}"
sequel_opts => {
jdbc_properties => {
"encryptionAlgorithm" => "2"
"securityMechanism" => "7"
}
}
schedule => "15 1 * * * Etc/GMT"
statement => "SELECT id, status, part_number, supplier_id, fault_description, creator_id, modifier_id, editor_supplier_id, editor_id, modified_at, created_at, closed_at, plant_number, part_number, p_part_number_base from complaint"
tags => "complaint"
}
}
filter {
if "complaint" in [tags] {
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "supplier"
query => "id:%{[supplier_id]}"
fields => {
"name" => "[supplier][name]"
"index" => "[supplier][supplierIndex]"
"number" => "[supplier][supplierNumber]"
"city" => "[supplier][city]"
}
}
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "plant"
query => "plant_number:%{[plant_number]}"
fields => {
"plant_name" => "[comp][plant]"
}
}
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "part"
query => "p_part_number_base:%{[p_part_number_base]} AND part_number:%{[part_number]}"
fields => {
"p_part_number_base" => "[part][partNumberBase]"
"part_number" => "[part][partNumber]"
"p_part_number_es1" => "[part][es1]"
"p_part_number_es2" => "[part][es2]"
"partname" => "[part][partName]"
"color_designation" => "[part][colorDesignation]"
}
}
}
mutate {
rename => {
"id" => "[comp][complaintId]"
"status" => "[comp][status]"
"plant_number" => "[comp][plantNumber]"
"supplier_id" => "[comp][supplierId]"
"fault_description" => "[comp][faultDescription]"
"creator_id" => "[comp][creatorId]"
"modifier_id" => "[comp][modifierId]"
"editor_supplier_id" => "[comp][editorSupplierId]"
"editor_id" => "[comp][editorId]"
"modified_at" => "[comp][modifiedAt]"
"created_at" => "[comp][createdAt]"
"closed_at" => "[comp][closedAt]"
}
remove_field => ["supplier_id", "supplier_data", "plant_data", "part_number", "p_part_number_base", "part_data", "@version", "@timestamp"]
}
}
output {
if "complaint" in [tags] {
elasticsearch {
hosts => ["http://complaint-elk-elasticsearch-master:9200"]
index => "complaint"
document_id => "%{[comp][complaintId]}"
}
}
}
It is working, however the "if" conditional is misbehaving, meaning for the supplier_masterdata for example it enters the "complaint" section somehow and adds a [comp][complaintId] to the data, I have checked it with rubydebug.
Am I doing something wrong with the tagging or using the if conditional improperly?
Any help would be much appreciated...