Accessing hash fields with the JDBC_STATIC filter

Hi,

I am currently receiving logs from windows servers that are shipped with winlogbeat and adding a field from the metadata that winlogbeat is sending, like this:

1001-input-beats.conf:

input {
  beats {
    port => 5044
    id => "beats-input"
    tags => "beats"
     }
}
filter {
    mutate {
                copy => {"[@metadata][ip_address]" => "[beat][ip]"}
        }
 }

And i then want to perform a MySQL lookup like using the beat.ip, like this:

filter {
  if "beats" in [tags] {
    jdbc_static {
      loaders => [
        {
          id => "elkDevIndexAssoc"
          query => "select * from elkDevIndexAssoc"
          local_table => "elkDevIndexAssoc"
        }
      ]
      local_db_objects => [
        {
          name => "elkDevIndexAssoc"
          index_columns => ["cenDevIP"]
          columns => [
            ["cenDevSID", "varchar(255)"],
            ["cenDevFQDN", "varchar(255)"],
            ["cenDevIP", "varchar(255)"],
            ["cenDevServiceName", "varchar(255)"]
          ]
        }
      ]
      local_lookups => [
        {
          id => "localObjects"
          query => "select * from elkDevIndexAssoc WHERE cenDevIP = :host"
          parameters => {host => "%{[beat]}%{[ip]}" }
          target => "cendotEnhanced"
          tag_on_failure => [ "sql_failure" ]
        }
      ]
      # using add_field here to add & rename values to the event root
      add_field => { cendotFQDN => "%{[cendotEnhanced[0][cendevfqdn]}" }
      add_field => { cendotSID => "%{[cendotEnhanced[0][cendevsid]}" }
      add_field => { cendotServiceName => "%{[cendotEnhanced[0][cendevservicename]}" }
      remove_field => ["cendotEnhanced"]
      jdbc_user => "username"
      jdbc_password => "password"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.0.11.jar"
      jdbc_connection_string => "jdbc:mysql://84.19.155.71:3306/logstash?serverTimezone=Europe/Stockholm"
      #jdbc_default_timezone => "Europe/Stockholm"
      loader_schedule => "*/5 * * * *"
      #add_tag => [ "sql_successful" ]
      #tag_on_failure => [ "sql_failure" ]
      #tag_on_default_use => [ "sql_failure" ]
    }

      if [cendotFQDN] =~ /^%/ {
           mutate {
                  add_tag => [ "sql_failure" ]
                  }
      }
      else {
           mutate {
                  add_tag => [ "sql_successful" ]
           }
       }
   }
}

More specifically:

local_lookups => [
            {
              id => "localObjects"
              query => "select * from elkDevIndexAssoc WHERE cenDevIP = :host"
              parameters => {host => "%{[beat]}%{[ip]}" }
              target => "cendotEnhanced"
              tag_on_failure => [ "sql_failure" ]
            }
          ]

However this lookup fails and the fields that i'm adding are instead populated with:

cendotFQDN = %{[cendotEnhanced[0][cendevfqdn]}

I think what i'm doing wrong is accessing the hash fields wrong, what is the correct way?

Thanks

Maybe it is the format of the beat.ip field?

Did you try using:

parameters => {host => "%{[beat][ip]}" }

Instead of:

parameters => {host => "%{[beat]}%{[ip]}" }

??

1 Like

Thank you sir, i sometimes get blind to the most obvious solutions when stuck trying to find a fix for a problem for a long time :slight_smile: Thanks!

No problem! :grinning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.