How to use the input kafka topic in the output elasticsearch with logstash?

Here is my use case:
I use kafka messages is the input data and then output to the elasticsearch database. I consume multi topics as below setting:

input {
   kafka {
      bootstrap_servers => "127.0.0.1:9092"
      group_id => "im_group"
      auto_offset_reset => "latest" #automatically reset the offset to the latest                      offset
      consumer_threads => 3
      #decorate_events => true 
       topics_pattern => "im_(.*)" #subscribe topics with im_
      codec => json {
      charset => "UTF-8"
  }   
}

now i want to use the topic as the elasticsearch index, for example, a topic named im_test, then insert into the elasticsearch im_test index, and others will be insert into the exact index as their topic named.

If you set decorate_events => true then the topic name will be in [@metadata][kafka][topic]. You can then use

index => "%{[@metadata][kafka][topic]}"

on the elasticsearch output.

Thank you for your help. This works for me, :maple_leaf::maple_leaf::maple_leaf:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.