Here is my use case:
I use kafka messages is the input data and then output to the elasticsearch database. I consume multi topics as below setting:
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
group_id => "im_group"
auto_offset_reset => "latest" #automatically reset the offset to the latest offset
consumer_threads => 3
#decorate_events => true
topics_pattern => "im_(.*)" #subscribe topics with im_
codec => json {
charset => "UTF-8"
}
}
now i want to use the topic as the elasticsearch index, for example, a topic named im_test, then insert into the elasticsearch im_test index, and others will be insert into the exact index as their topic named.