Logstash pipeline Elasticsearch filter if conditional not working

Hello,

I am using the following configuration to sync some data into my Elasticsearch indeces:

logstashPipeline:
      logstash.conf: |
        input {
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "30 22 * * * Etc/GMT"
            statement => "SELECT plant_name, plant_number from ELASTIC_POC_PLANT"
            tags => "plant"
          }
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "45 22 * * * Etc/GMT"
            statement => "SELECT id, index, number, name, city from ELASTIC_POC_SUPPLIER"
            tags => "supplier"
          }
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "15 23 * * * Etc/GMT"
            statement => "SELECT part_id, p_part_number_base, part_number, p_part_number_es1, part_number_es2, partname, color_designation from ELASTIC_POC_PART"
            tags => "part"
          }
        }
        output {
          if "supplier" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "supplier"
              document_id => "%{id}"
            }
          }
          if "plant" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "plant"
              document_id => "%{plant_number}"
            }
          }
          if "part" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "part"
              document_id => "%{part_id}"
            }
          }
        }
 
      complaint.conf: |
        input {
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "15 1 * * * Etc/GMT"
            statement => "SELECT id, status, part_number, supplier_id, fault_description, creator_id, modifier_id, editor_supplier_id, editor_id, modified_at, created_at, closed_at, plant_number, part_number, p_part_number_base from complaint"
            tags => "complaint"
          }
        }
        filter {
          if "complaint" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "supplier"
              query => "id:%{[supplier_id]}"
              fields => {
                "name" => "[supplier][name]"
                "index" => "[supplier][supplierIndex]"
                "number" => "[supplier][supplierNumber]"
                "city" => "[supplier][city]"
              }
            }
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "plant"
              query => "plant_number:%{[plant_number]}"
              fields => {
                "plant_name" => "[comp][plant]"
              }
            }
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "part"
              query => "p_part_number_base:%{[p_part_number_base]} AND part_number:%{[part_number]}"
              fields => {
                "p_part_number_base" => "[part][partNumberBase]"
                "part_number" => "[part][partNumber]"
                "p_part_number_es1" => "[part][es1]"
                "p_part_number_es2" => "[part][es2]"
                "partname" => "[part][partName]"
                "color_designation" => "[part][colorDesignation]"
              }
            }
          }
          mutate {
            rename => {
              "id" => "[comp][complaintId]"
              "status" => "[comp][status]"
              "plant_number" => "[comp][plantNumber]"
              "supplier_id" => "[comp][supplierId]"
              "fault_description" => "[comp][faultDescription]"
              "creator_id" => "[comp][creatorId]"
              "modifier_id" => "[comp][modifierId]"
              "editor_supplier_id" => "[comp][editorSupplierId]"
              "editor_id" => "[comp][editorId]"
              "modified_at" => "[comp][modifiedAt]"
              "created_at" => "[comp][createdAt]"
              "closed_at" => "[comp][closedAt]"
            }
            remove_field => ["supplier_id", "supplier_data", "plant_data", "part_number", "p_part_number_base", "part_data", "@version", "@timestamp"]
          }
        }
        output {
          if "complaint" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "complaint"
              document_id => "%{[comp][complaintId]}"
            }
          }
        }

It is working, however the "if" conditional is misbehaving, meaning for the supplier_masterdata for example it enters the "complaint" section somehow and adds a [comp][complaintId] to the data, I have checked it with rubydebug.

Am I doing something wrong with the tagging or using the if conditional improperly?

Any help would be much appreciated...

Never mind, my bad, the mutate part wasn't included in the proper if statement and hence every event got mutated accordingly.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.