Where should I place jar file containing de-serializer class for Kafka input plugin?

Hi everyone,

I am using ELK GA 6.0.0. I am consuming mesages from my Kafka topic, below is my configuration;

input {
	kafka{
		group_id => "group_1"
		topics => ["topic_1"]
		bootstrap_servers => "192.168.0.1:9092"
		codec => avro {
			schema_uri => "/files/GA6/logstash-6.0.0/CONFIG_HOME/cnslog.avsc"
		}
		key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
		value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
	}
}

I am getting the below error;

[2018-01-25T15:45:00,710][ERROR][logstash.inputs.kafka    ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.KafkaAvroDeserializer for configuration key.deserializer: Class io.confluent.kafka.serializers.KafkaAvroDeserializer could not be found., :cause=>nil}
[2018-01-25T15:45:00,713][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:main
  Plugin: <LogStash::Inputs::Kafka group_id=>"group_1", topics=>["topic_1"], bootstrap_servers=>"192.168.0.1:9092", enable_auto_commit=>"false", codec=><LogStash::Codecs::Avro schema_uri=>"/files/GA6/logstash-6.0.0/CONFIG_HOME/cnslog.avsc", id=>"a3017e72-dd37-42f8-b560-46474401c4d4", enable_metric=>true, tag_on_failure=>false>, key_deserializer_class=>"io.confluent.kafka.serializers.KafkaAvroDeserializer", value_deserializer_class=>"io.confluent.kafka.serializers.KafkaAvroDeserializer", id=>"abf04c1ea239b1ea986a786e5e760b93edf36a9e0bae13d888dfe35e1f9762fd", enable_metric=>true, auto_commit_interval_ms=>"5000", client_id=>"logstash", consumer_threads=>1, poll_timeout_ms=>100, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
  Error: uncaught throw org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.KafkaAvroDeserializer for configuration key.deserializer: Class io.confluent.kafka.serializers.KafkaAvroDeserializer could not be found.
  Exception: UncaughtThrowError
  Stack: org/jruby/RubyKernel.java:1137:in `throw'
/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:329:in `create_consumer'
/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:224:in `block in run'
org/jruby/RubyFixnum.java:305:in `times'
org/jruby/RubyEnumerator.java:323:in `each'
org/jruby/RubyEnumerable.java:830:in `map'
/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:224:in `run'
/files/GA6/logstash-6.0.0/logstash-core/lib/logstash/pipeline.rb:574:in `inputworker'
/files/GA6/logstash-6.0.0/logstash-core/lib/logstash/pipeline.rb:567:in `block in start_input'

I have the jar file kafka-avro-serializer-3.2.0.jar which contains the classes, and I have placed that in folder /files/GA6/logstash-6.0.0.

Why is this error happening? How can I fix this?

Thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.