Send data to elasticsearch index from kafka that is named the same as the topic

As it says in the title I want to pull out the topic name from the incoming event and write to an index in elasticsearchthat is named the same as that topic name.

input {
kafka {
zk_connect => "zookeeper:2181"
group_id => "transport_logstash_elasticsearch_local_local"
white_list => ".*_json"
decorate_events => true
consumer_threads => 1
queue_size => 20
rebalance_max_retries => 4
rebalance_backoff_ms => 2000
consumer_timeout_ms => -1
consumer_restart_on_error => true
consumer_restart_sleep_ms => 0
fetch_message_max_bytes => 1048576
reset_beginning => false
type => "%{kafka.topic}"


#json {

source => "kafka"


output {
action => "index"
codec => "plain"
flush_size => 400
hosts => ["localhost:9200"]
idle_flush_time => 1
index => "%{kafka.topic}"
workers => 1

I thought the json filter plugin might be helpful (commented out in above code)but it seems to be unable to parse it:

Error parsing json {:source=>"kafka", :raw=>{"msg_size"=>9, "topic"=>"test_json", "consumer_group"=>"transport_logstash_elasticsearch_local_local", "partition"=>0, "offset"=>45, "key"=>nil}, :exception=>java.lang.ClassCastException: org.jruby.RubyHash cannot be cast to org.jruby.RubyIO, :level=>:warn}
^CSIGINT received. Shutting down the agent. {:level=>:warn}

Any help would be great. Thank you

It looks like the kafka field has already been parsed from JSON and that your json filter is unnecessary.

Also, please note that the notation for nested fields is [field][subfield] and not field.subfield.

Thank you very much Magnus - it works perfectly now.