Kafka input plugin does not set @metadata fields

The kafka input plugin does not appear to set the @metadata fields as described in this Logstash documentation.

I'm using Elasticsearch 6.2.2 and Logstash 6.2.2 on Docker images, running on a CentOS 7.4 server.

I wanted to set up a kafka input pipeline that would read from any topic defined in an array. Based on the documentation, I should have been able to set the index name used for Elasticsearch in the output plugin using this syntax:

output {
  elasticsearch {
    hosts => <list of hosts redacted>
    index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
  }
}

Instead, the index pattern generated by data sent to any of the topics is set to the literal string, %{[@metadata][kafka][topic]}- followed by the date. I now have an index in my Elasticsearch cluster for today named %{[@metadata][kafka][topic]}-2018.03.22.

To test whether %{foo} substitution was working in the output plugin, I created three kafka input instances, one for each of the topics I wanted to read from, and added a field named "topic" with the value set to an Elasticsearch-compatible string version of the topic. This works: I get three indexes with the current date substituted in properly: test-YYYY.MM.dd, sensor_events-YYYY.MM.dd, and controller_log-YYYY.MM.dd.

Here's the workaround config:

input {

  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "test"]
    consumer_threads => 3
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    codec => json {
      charset => "ISO-8859-1"
    }
    add_field => { "topic" => "test" }
  }

  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "sensorEvents" ]
    consumer_threads => 3
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    codec => json {
      charset => "ISO-8859-1"
    }
    add_field => { "topic" => "sensor_events" }
  }

  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "controllerLog" ]
    consumer_threads => 3
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    codec => json {
      charset => "ISO-8859-1"
    }
    add_field => { "topic" => "controller_log" }
  }
}

output {
  elasticsearch {
    hosts => <elasticsearch_hosts_redacted>
    index => "%{[topic]}-%{+YYYY.MM.dd}"
  }
}

I also tried creating a mutate filter to set the "topic" field to the contents of the [@metadata][kafka][topic] variable, and this doesn't work either. I can only conclude that the [@metadata][kafka][topic] is never populated by the plugin.

1 Like

Oy. That's frustrating.

Metadata is only added to the event if the decorate_events option is set to true (it defaults to false); we should definitely make it easier to find and make sense of what's included :weary:.

decorate_events

  • Value type is boolean
  • Default value is false

Option to add Kafka metadata like topic, message size to the event.

I've opened an issue on the plugin to improve the docs: logstash-plugins/logstash-input-kafka#255

1 Like

Thanks! That would help. :smiley:

As you might expect, that was clearly the issue. I thought I'd post the optimized config file for others to look at, as well as my solution for remapping the camel-case index names to something Elasticsearch can actually index. It throws an error when the index name contains capital letters.

Here's my solution that solved both problems:

input {

  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "test", "controllerLog" ]
    consumer_threads => 6
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    decorate_events => true
    codec => json {
      charset => "ISO-8859-1"
    }
  }

}

filter {
  ruby {
    code => "event.set('[@metadata][kafka][lc_topic]', event.get('[@metadata][kafka][topic]').split(/(?=[A-Z])/).map{|x| x.downcase }.join('_') )"
  }
}

output {
  elasticsearch {
    hosts => <elasticsearch_hosts_redacted>
    index => "%{[@metadata][kafka][lc_topic]}-%{+YYYY.MM.dd}"
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.