Enriching winlogbeat data with the jdbc_static filter

Hi

I have a MySQL database with the following table structure:

ID, FQDN, IP, SERVICE

I want to enrich the logs recevied from our Windows servers using winlogbeat. I'd like to add the following fields:

  1. ID => ID
  2. FQDN => host
  3. SERVICE => service

I have the following configuration which i have not yet gotten to work:

filter {
  if "beats" in [tags] {
    jdbc_static {
      loaders => [
        {
          id => "elkDevIndexAssoc"
          query => "select * from elkDevIndexAssoc"
          local_table => "elkDevIndexAssoc"
        }
      ]
      local_db_objects => [
        {
          name => "elkDevIndexAssoc"
          index_columns => ["cenDevIP"]
          columns => [
            ["cenDevSID", "varchar(255)"],
            ["cenDevFQDN", "varchar(255)"],
            ["cenDevIP", "varchar(255)"],
            ["cenDevServiceName", "varchar(255)"]
          ]
        }
      ]
      local_lookups => [
        {
          id => "localObjects"
          query => "select * from elkDevIndexAssoc WHERE cenDevFQDN = :computer_name"
          parameters => {host => "[cenDevFQDN]"}
          target => "cendotEnhanced"
          tag_on_failure => [ "sql_failure" ]
        }
      ]
      # using add_field here to add & rename values to the event root
      add_field => { cendotFQDN => "%{[cendotEnhanced[0][cendevfqdn]}" }
      add_field => { cendotSID => "%{[cendotEnhanced[0][cendevsid]}" }
      add_field => { cendotServiceName => "%{[cendotEnhanced[0][cendevservicename]}" }
      remove_field => ["cendotEnhanced"]
      jdbc_user => "username"
      jdbc_password => "password"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.0.11.jar"
      jdbc_connection_string => "jdbc:mysql://84.19.155.71:3306/logstash?serverTimezone=Europe/Stockholm"
      #jdbc_default_timezone => "Europe/Stockholm"
      loader_schedule => "*/5 * * * *"
      add_tag => [ "sql_successful" ]
      tag_on_failure => [ "sql_failure" ]
      #tag_on_default_use => [ "sql_failure" ]
    }
      if ! [cendotFQDN] {
           mutate {
                  add_tag => [ "sql_failure" ]
           }
       }
   }
}

Here i try to get all of the information about a device by searching for its FQDN in our database. I saw that in kibana, the field "computer_name" is the FQDN of our devices.

query => "select * from elkDevIndexAssoc WHERE cenDevFQDN = :computer_name"

This part is what's failing:

parameters => {host => "[cenDevFQDN]"}

In the logstash logs i get the following:

[2018-07-30T09:48:04,239][WARN ][logstash.filters.jdbc.lookup] Parameter field not found in event {:lookup_id=>"localObjects", :invalid_parameters=>["[cenDevFQDN]"]}

What am i doing wrong here?

AFAICT the problem lies with your parameters => {host => "[cenDevFQDN]"} line.
Each of these parameters lines up like this:

  • The left hand side (you have host) must be the same as the substitution string (minus the colon) in your statement, you have computer_name.
  • the right hand side must refer to a field in an event that is coming from winlogbeat, from the docs I presume it is computer_name.

Based on this, I think the parameters line should be:
parameters => {computer_name => "[computer_name]"}

The docs have been updated recently to better explain this.

Perhaps changing you statement and the parameters will make it more obvious (in 6 months time):

  query => "select * from elkDevIndexAssoc WHERE cenDevFQDN = :substitute_this"
  parameters => {substitute_this => "[computer_name]"}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.