The kafka input plugin does not appear to set the @metadata fields as described in this Logstash documentation.
I'm using Elasticsearch 6.2.2 and Logstash 6.2.2 on Docker images, running on a CentOS 7.4 server.
I wanted to set up a kafka input pipeline that would read from any topic defined in an array. Based on the documentation, I should have been able to set the index name used for Elasticsearch in the output plugin using this syntax:
output {
elasticsearch {
hosts => <list of hosts redacted>
index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
}
Instead, the index pattern generated by data sent to any of the topics is set to the literal string, %{[@metadata][kafka][topic]}- followed by the date. I now have an index in my Elasticsearch cluster for today named %{[@metadata][kafka][topic]}-2018.03.22.
To test whether %{foo} substitution was working in the output plugin, I created three kafka input instances, one for each of the topics I wanted to read from, and added a field named "topic" with the value set to an Elasticsearch-compatible string version of the topic. This works: I get three indexes with the current date substituted in properly: test-YYYY.MM.dd, sensor_events-YYYY.MM.dd, and controller_log-YYYY.MM.dd.
Here's the workaround config:
input {
kafka {
bootstrap_servers => <kafka_server_list_redacted>
topics => [ "test"]
consumer_threads => 3
auto_offset_reset => "latest"
auto_commit_interval_ms => "500"
enable_auto_commit => true
codec => json {
charset => "ISO-8859-1"
}
add_field => { "topic" => "test" }
}
kafka {
bootstrap_servers => <kafka_server_list_redacted>
topics => [ "sensorEvents" ]
consumer_threads => 3
auto_offset_reset => "latest"
auto_commit_interval_ms => "500"
enable_auto_commit => true
codec => json {
charset => "ISO-8859-1"
}
add_field => { "topic" => "sensor_events" }
}
kafka {
bootstrap_servers => <kafka_server_list_redacted>
topics => [ "controllerLog" ]
consumer_threads => 3
auto_offset_reset => "latest"
auto_commit_interval_ms => "500"
enable_auto_commit => true
codec => json {
charset => "ISO-8859-1"
}
add_field => { "topic" => "controller_log" }
}
}
output {
elasticsearch {
hosts => <elasticsearch_hosts_redacted>
index => "%{[topic]}-%{+YYYY.MM.dd}"
}
}
I also tried creating a mutate filter to set the "topic" field to the contents of the [@metadata][kafka][topic] variable, and this doesn't work either. I can only conclude that the [@metadata][kafka][topic] is never populated by the plugin.