Kafka input to logstash

Hi All,

i am trying to add multiple kafka topics to logstash and output to elastic. my requirement here is, the index name should be the topic name, as below...

input {
  kafka {
    bootstrap_servers => "broker.kafka-elk.l4lb.thisdcos.directory:9092"
    topics => ["cid-test-express","cid-test-default"]
    group_id => "test-consumer-group"
#    auto_offset_reset => "earliest"
    decorate_events => true
    codec => json
}
}
filter {
grok {
        match => { "[log][file][path]" => "/mnt/volume0/cid-v[0-9]-.*-service/services/(?<version>[^/]+)/" }
}
}

output {
if [@metadata][topic] == "cid-test-express" and [fields][log_type] == "express" {
elasticsearch {
hosts => ["es-master1.cidaas-elk.l4lb.thisdcos.directory:9200","es-master2.cidaas-elk.l4lb.thisdcos.directory:9200","es-master3.cidaas-elk.l4lb.thisdcos.directory:9200"]
index => "%{[@metadata][kafka][topic]}-%{version}-%{+YYYY.MM.dd}"
}
}
if [@metadata][topic] == "cid-test-default" and [fields][log_type] == "default" {
elasticsearch {
hosts => ["es-master1.cidaas-elk.l4lb.thisdcos.directory:9200","es-master2.cidaas-elk.l4lb.thisdcos.directory:9200","es-master3.cidaas-elk.l4lb.thisdcos.directory:9200"]
index => "%{[@metadata][kafka][topic]}-%{version}-%{+YYYY.MM.dd}"
}
}
}

only first if statement works, but not the second one, second topic index is not created!!! can any one help me regarding this!!! thanks in advance

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.