How to join fields from multi events into single event?

Hi, I feel like my issue should be a common one, however I have spent days trying to find the answer to no avail...

Here is what I am trying to accomplish: I have data from 2 separate databases that share a common primary key. I am using the JDBC input plugin to grab the data events. I need to combine the fields together based on the id and then output it to elasticsearch. How can I accomplish this? So far I have the following structure in the logstash conf:

input {
    jdbc { datasource1... }
}

input {
   jdbc { datasource2... }
}

filter {
  aggregate {
     (using example #4 from filter plugin page)
  }
}

output {
  elasticsearch { ... }
}

The aggregate filter will only work if my db results were ordered in such a way which the same primary key was right after one another, and using 2 datasources doesn't work that way. I wish I just didn't need the filter altogether and there was some join that could be done before outputing.

Thank you in advance for looking into this!

  • Dan

Logstash isn't stateful in a way that can handle this.
You will need to do two steps, grab the data from DB1 and put it into Elasticsearch in a temp index, then get the second DB dataset and do a lookup in the temp index to add the values from the first set.

Thanks for the suggestion, I applied that today and is a good workaround. I may end up executing a stored procedure (if possible) instead and do the join in there, but for now this solves my issue =)

1 Like

Would you be willing to share the (relative) config sections? I am sure someone else will find it useful! :smiley:

1 Like

Sure thing, this was used for storing company data

Config 1:

input {
    jdbc {
        id => "internal_data_plugin"
        jdbc_driver_library => "/etc/logstash/resources/drivers/sqljdbc42.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => (server info)
        jdbc_user => (username)
        jdbc_password => (password)
        schedule => "0 0 * * *"
        statement_filepath => "/etc/logstash/resources/statements/internal-data.sql"
        connection_retry_attempts => 3
        connection_retry_attempts_wait_time => 10
    }
}

output {
    elasticsearch {
        hosts => ["127.0.0.1"]
        user => (username)
        password => (password)
        index => "temp"
        document_type => "company"
        document_id => "%{companyId}"
    }
}

Config 2:

input {
    jdbc {
        id => "universal_data_plugin"
        jdbc_driver_library => "/etc/logstash/resources/drivers/sqljdbc42.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => (server info)
        jdbc_user => (username)
        jdbc_password => (password)
        schedule => "0 1 * * *"
        statement_filepath => "/etc/logstash/resources/statements/universal-data.sql"
        connection_retry_attempts => 3
        connection_retry_attempts_wait_time => 10
    }
}

filter {
    elasticsearch {
        hosts => ["127.0.0.1"]
        user => (username)
        password => (password)
        index => "temp"
        query => "companyId:%{companyId}"
        fields => {
            "transactionDate" => "transactionDate"
            "totalAssets" => "totalAssets"
        }
    }
}

output {
    elasticsearch {
        hosts => ["127.0.0.1"]
        user => (username)
        password => (password)
        index => "project"
        document_type => "company"
        document_id => "%{companyId}"
        template => "/etc/logstash/resources/templates/project-template.json"
        template_name => "project"
        template_overwrite => "true"
    }
}
3 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.