How to add a tag if device lookup using JDBC fails?

Hi

We have a MySQL database with all of our devices stored as a table with an ID, FQDN, IP and a service name. Now we want to have some assurance that if the device that is sending logs to our ELK stack isn't in our database it will send it's logs to a seperate index called "unknown".

Does logstash or the JDBC plugin have any method of adding a tag or something like that if the device lookup fails or if the values returned are zero/null?

Here is our configuration:

filter {
  if "syslog" in [tags] {
    jdbc_static {
      loaders => [
        {
          id => "elkDevIndexAssoc"
          query => "select * from elkDevIndexAssoc"
          local_table => "elkDevIndexAssoc"
        }
      ]
      local_db_objects => [
        {
          name => "elkDevIndexAssoc"
          index_columns => ["cenDevIP"]
          columns => [
            ["cenDevSID", "varchar(255)"],
            ["cenDevFQDN", "varchar(255)"],
            ["cenDevIP", "varchar(255)"],
            ["cenDevServiceName", "varchar(255)"]
          ]
        }
      ]
      local_lookups => [
        {
          id => "localObjects"
          query => "select * from elkDevIndexAssoc WHERE cenDevIP = :host"
          parameters => {host => "[host]"}
          target => "cendotEnhanced"
        }
      ]
      # using add_field here to add & rename values to the event root
      add_field => { cendotFQDN => "%{[cendotEnhanced[0][cendevfqdn]}" }
      add_field => { cendotSID => "%{[cendotEnhanced[0][cendevsid]}" }
      add_field => { cendotServiceName => "%{[cendotEnhanced[0][cendevservicename]}" }
      remove_field => ["cendotEnhanced"]
      jdbc_user => "unsername"
      jdbc_password => "password"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.0.11.jar"
      jdbc_connection_string => "jdbc:mysql://84.19.155.71:3306/logstash?serverTimezone=Europe/Stockholm"
      #jdbc_default_timezone => "Europe/Stockholm"
    }
  }  
}

Perhaps something like this:

filter {
	...

	if ! [centdotFQDN] {
		mutate {
			add_tag => [ "TAG" ]
		}
	}
}

Docs -> tag_on_failure

1 Like

Hi, i tried adding that in the following way:

add_field => { cendotFQDN => "%{[cendotEnhanced[0][cendevfqdn]}" }
      add_field => { cendotSID => "%{[cendotEnhanced[0][cendevsid]}" }
      add_field => { cendotServiceName => "%{[cendotEnhanced[0][cendevservicename]}" }
      remove_field => ["cendotEnhanced"]
      jdbc_user => "username"
      jdbc_password => "password"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.0.11.jar"
      jdbc_connection_string => "jdbc:mysql://123.123.123.123:3306/logstash?serverTimezone=Europe/Stockholm"
      #jdbc_default_timezone => "Europe/Stockholm"
      loader_schedule => "*/5 * * * *"
      #add_tag => [ "sql_successful" ] #this works
      #tag_on_failure => [ "sql_failure" ] #this does nothing

However, nothing gets tagged with "sql_failure" even though they don't have any entries in our mysql database. I added the following output to monitor if anything gets tagged with "sql_failure" but it shows nothing..

else if "sql_failure" in [tags] {
   stdout { codec => rubydebug }
}

EDIT: I think that the tag never gets applied because it technically never "fails", it just doesn't add anything to the fields?

So i sent the dead_letter_queue to stdout and this is what i got. It still gets the tag "syslog" but not "sql_failure" that should be added with the jdbc plugin. Can anyone help remedy this? Would it be possible to have a conditional to check for these kinds of messages and send them to a seperate index?

  Jul 27 07:50:40 sealijvblog01 logstash[39452]:         }
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:     },
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:                  "host" => "123.123.123.123",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:              "@version" => "1",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:               "message" => "<167>2018-07-05T11:03:11.122Z ivcl00esxi03 Vpxa: verbose vpxa[2D264B70] [Originator@6876 sub=hostdvm opID=WFU-7bfc9bfc] [VpxaHalVmHostagent] 2: GuestInfo changed 'guest.disk'\n",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:     "cendotServiceName" => [
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [0] "%{[cendotEnhanced[0][cendevservicename]}",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [1] "%{[cendotEnhanced[0][cendevservicename]}"
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:     ],
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:                  "tags" => [
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [0] "syslog"
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:     ]
    Jul 27 07:50:40 sealijvblog01 logstash[39452]: }
    Jul 27 07:50:40 sealijvblog01 logstash[39452]: {
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:             "cendotSID" => [
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [0] "%{[cendotEnhanced[0][cendevsid]}",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [1] "%{[cendotEnhanced[0][cendevsid]}"
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:     ],
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:            "@timestamp" => 2018-07-05T11:03:11.155Z,
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:            "cendotFQDN" => [
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [0] "%{[cendotEnhanced[0][cendevfqdn]}",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         [1] "%{[cendotEnhanced[0][cendevfqdn]}"
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:     ],
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:             "@metadata" => {
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:         "dead_letter_queue" => {
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:              "entry_time" => 2018-07-05T11:03:27.940Z,
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:               "plugin_id" => "8dd52b09c1d7fb90affc84e3485088635706795b78d975ead53ea53db0b1c7e0",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:             "plugin_type" => "elasticsearch",
    Jul 27 07:50:40 sealijvblog01 logstash[39452]:                  "reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-%{[cendotEnhanced[0][cendevservicename]}-2018-07-05\", :_type=>\"doc\", :_routing=>nil}, #<LogStash::Event:0x251e3035>], response: {\"index\"=>{\"_index\"=>\"logstash-%{[cendotEnhanced[0][cendevservicename]}-2018-07-05\", \"_type\"=>\"doc\", \"_id\"=>nil, \"status\"=>400, \"error\"=>{\"type\"=>\"invalid_index_name_exception\", \"reason\"=>\"Invalid index name [logstash-%{[cendotEnhanced[0][cendevservicename]}-2018-07-05], must be lowercase\", \"index_uuid\"=>\"_na_\", \"index\"=>\"logstash-%{[cendotEnhanced[0][cendevservicename]}-2018-07-05\"}}}"

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.