I have tried this conf but not working
input {
kafka {
...
decorate_events => true
}
}
filter {
mutate {
add_field => {"[@metadata][topic]" => "%{[kafka][topic]}"}
}
}
output {
elasticsearch {
document_type => "%{[@metadata][topic]}"
...
}
}
result of "%{[@metadata][topic]}"
"%{[kafka][topic]}"