How to pull data data from 2 kafka topics using logstash and index the data in two separate index in elasticsearch


(Kamal) #1

I have a question regarding Kafka. I have create two topics called first_topic and second_topic

I have a logstash-kafka.conf file

input {
kafka {
bootstrap_servers => "servera:9092"
topics => ["first_topic"]
}
}

output {
elasticsearch {
hosts => ["elasticA:9200"]
index => "logstash-nco"
}

and I get the data fine in elasticsearch.

how can I pull the data from second_topic and insert into logstash-tiv index in elasticsearch? Do I have to create to separate conf file? or can I add the logic into the existing logstash-kafka.conf file? Please tell mehow can I pull the data from 2 separate topic and insert into 2 separate index using the same config file logstash-kafka.conf?


(Guy Boertje) #2

Put both topics in the topics setting.

input {
  kafka {
    bootstrap_servers => "servera:9092"
    topics => ["first_topic", "second_topic"]
  }
}

Internally, this happens.

event.set("[@metadata][kafka][topic]", record.topic)

Meaning you can then use conditional blocks to add a "index" field.

filter {
  if [@metadata][kafka][topic] == "first_topic" {
    clone {
     # does not clone if `clones` setting is not specified 
      add_field => { "[es_index]" => "logstash-nco"}
    }
  } else if [@metadata][kafka][topic] == "second_topic" {
    clone {
      add_field => { "[es_index]" => "logstash-tiv"}
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticA:9200"]
    index => "%{es_index}"
  }
}

However, if your topics are "nco" and "tiv" then its much simpler, no conditionals.

input {
  kafka {
    bootstrap_servers => "servera:9092"
    topics => ["nco", "tiv"]
    add_field => { "[es_index]" => "logstash-%{[@metadata][kafka][topic]}"}
  }
}
output {
  elasticsearch {
    hosts => ["elasticA:9200"]
    index => "%{es_index}"
  }
}

(Guy Boertje) #3

Before actually sending to ES, test with a debug output.

output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.