Logstash 2.3.1 kafka plugins ssl support

Hello,
We use Kafka input and output plugins with Logstash in our log pipeline. We are using 2.3.1 version of Logstash and Elasticsearch. I see that there is now Kafka 0.9 producer/consumer support in LS 5.0. Any chance you would backport that to 2.3.1? Or is there a way I can retrofit to use that plugin with my Logstash 2.3.1? We are using Kafka 0.9, and are guaranteed not to have any 0.8 brokers, so backward compatibility is not an issue for me.

We have secured with the pipeline from filebeat to Logstash. And also from Logstash to Elasticsearch. Would like to do this as well to get end-to-end SSL.

EDIT: of course, minutes after posting, I run into this:

So modifying my question to: when is this becoming GA? :slight_smile:

Thanks
-Sundar

kafka input and output 0.9 has been GAed.

3.0.2 for logstash-input-kafka

3.0.0 for logstash-output-kafka

Thanks so much!

@allenmchan @suyograo I tried switching to the recently GA'd 0.9 client based Kafka plugins as mentioned above, and getting some errors during conf validation. Could you please help? Here are the details:

SUNDARA2-M-C1P6:~/dev/logstash/logstash-2.3.1/bin>./logstash-plugin install --version 3.0.2 logstash-input-kafka
Validating logstash-input-kafka-3.0.2
Installing logstash-input-kafka
Installation successful
SUNDARA2-M-C1P6:~/dev/logstash/logstash-2.3.1/bin>./logstash-plugin install --version 3.0.0 logstash-output-kafka
Validating logstash-output-kafka-3.0.0
Installing logstash-output-kafka
Installation successful
SUNDARA2-M-C1P6:~/dev/logstash/logstash-2.3.1/bin>./logstash -t -f test.conf
Unknown setting 'topic_id' for kafka {:level=>:error}
Unknown setting 'reset_beginning' for kafka {:level=>:error}
The given configuration is invalid. Reason: Something is wrong with your configuration. {:level=>:fatal}
SUNDARA2-M-C1P6:~/dev/logstash/logstash-2.3.1/bin>cat test.conf
input {
  heartbeat {
    interval => 10
    type => "heartbeat"
  }
  kafka {
    bootstrap_servers => "broker-1.service.local:4000,broker-2.service.local:4000,broker-3.service.local:4000"
    group_id => "logstash"
    topic_id => "logstash_logs"
    reset_beginning => false
    consumer_threads => 1
  }
}

output {
  if [type] == "heartbeat" {
    exec {
      codec => "json"
      command => "date +%s > /tmp/heartbeat"
    }
  } else {
    elasticsearch {
      hosts => ["elasticsearch-elk.service.local:9200"]
      user => "logstashuser"
      password => "logstashuser"
      index => "logstash-%{+YYYY.MM.dd}"
      ssl => true
      cacert => "/Users/sundara2/certs/serviceCA.crt"
    }
  }
}

I believe you are referring to logstash 2.3 docs? You should be referring to master or 5.x.

https://www.elastic.co/guide/en/logstash/master/plugins-inputs-kafka.html

Back to the errors you are getting, it is because the config settings are not right for 0.9.

Thanks @allenmchan. Looks like for input conf it is called "topics" whereas for output conf it is still called topic_id. Would be great if they can be the same. But at least it does not complain anymore.

Also, looks like reset_beginning is no longer supported config. What (if any) is the equivalent in 3.0.x please?

Thanks
-Sundar

are you referring to "auto.offset.reset" using "earliest"?

yes, this is probably what I am looking for thanks! I will read up a little more on implications of not setting this to one of the options provided (reset_beginning used to have a "false"). In 0.9 client we have to explicitly set to one of 4 values, and that is not a bad thing.

Thanks once again for your help!