The kafka input plugin does not appear to set the @metadata fields as described in this Logstash documentation.
I'm using Elasticsearch 6.2.2 and Logstash 6.2.2 on Docker images, running on a CentOS 7.4 server.
I wanted to set up a kafka input pipeline that would read from any topic defined in an array. Based on the documentation, I should have been able to set the index name used for Elasticsearch in the output plugin using this syntax:
output {
  elasticsearch {
    hosts => <list of hosts redacted>
    index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
  }
}
Instead, the index pattern generated by data sent to any of the topics is set to the literal string, %{[@metadata][kafka][topic]}- followed by the date. I now have an index in my Elasticsearch cluster for today named %{[@metadata][kafka][topic]}-2018.03.22.
To test whether %{foo} substitution was working in the output plugin, I created three kafka input instances, one for each of the topics I wanted to read from, and added a field named "topic" with the value set to an Elasticsearch-compatible string version of the topic. This works: I get three indexes with the current date substituted in properly: test-YYYY.MM.dd, sensor_events-YYYY.MM.dd, and controller_log-YYYY.MM.dd.
Here's the workaround config:
input {
  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "test"]
    consumer_threads => 3
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    codec => json {
      charset => "ISO-8859-1"
    }
    add_field => { "topic" => "test" }
  }
  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "sensorEvents" ]
    consumer_threads => 3
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    codec => json {
      charset => "ISO-8859-1"
    }
    add_field => { "topic" => "sensor_events" }
  }
  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "controllerLog" ]
    consumer_threads => 3
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    codec => json {
      charset => "ISO-8859-1"
    }
    add_field => { "topic" => "controller_log" }
  }
}
output {
  elasticsearch {
    hosts => <elasticsearch_hosts_redacted>
    index => "%{[topic]}-%{+YYYY.MM.dd}"
  }
}
I also tried creating a mutate filter to set the "topic" field to the contents of the [@metadata][kafka][topic] variable, and this doesn't work either. I can only conclude that the [@metadata][kafka][topic] is never populated by the plugin.
