As it says in the title I want to pull out the topic name from the incoming event and write to an index in elasticsearchthat is named the same as that topic name.
input {
kafka {
zk_connect => "zookeeper:2181"
group_id => "transport_logstash_elasticsearch_local_local"
white_list => ".*_json"
decorate_events => true
consumer_threads => 1
queue_size => 20
rebalance_max_retries => 4
rebalance_backoff_ms => 2000
consumer_timeout_ms => -1
consumer_restart_on_error => true
consumer_restart_sleep_ms => 0
fetch_message_max_bytes => 1048576
reset_beginning => false
type => "%{kafka.topic}"
}
}
filter{
#json {
source => "kafka"
}
}
output {
elasticsearch{
action => "index"
codec => "plain"
flush_size => 400
hosts => ["localhost:9200"]
idle_flush_time => 1
index => "%{kafka.topic}"
workers => 1
}
}
I thought the json filter plugin might be helpful (commented out in above code)but it seems to be unable to parse it:
Error parsing json {:source=>"kafka", :raw=>{"msg_size"=>9, "topic"=>"test_json", "consumer_group"=>"transport_logstash_elasticsearch_local_local", "partition"=>0, "offset"=>45, "key"=>nil}, :exception=>java.lang.ClassCastException: org.jruby.RubyHash cannot be cast to org.jruby.RubyIO, :level=>:warn}
^CSIGINT received. Shutting down the agent. {:level=>:warn}
Any help would be great. Thank you