Hello all,
I am using ELK GA 6.0.0. I am using kafka input and below is the configuration;
input {
kafka{
group_id => "avro_test_group_1"
topics => ["avro_test_topic"]
bootstrap_servers => "192.168.0.1:9092"
codec => avro {
schema_uri => "/apps/GA6/logstash-6.0.0/avrotest.avsc"
}
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
}
}
output{
stdout{
codec => rubydebug
}
}
When I run logstash I am getting the below error;
[2018-02-23T10:12:36,501][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.KafkaAvroDeserializer for configuration key.deserializer: Class io.confluent.kafka.serializers.KafkaAvroDeserializer could not be found., :cause=>nil}
[2018-02-23T10:12:36,503][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::Kafka group_id=>"avro_test_group_1", topics=>["avro_test_topic"], bootstrap_servers=>"192.168.0.1:9092", codec=><LogStash::Codecs::Avro schema_uri=>"/apps/GA6/logstash-6.0.0/CONFIG_HOME/avrotest.avsc", id=>"d9f381c5-071b-4dba-b4c6-aef64ad8bfd9", enable_metric=>true, tag_on_failure=>false>, key_deserializer_class=>"io.confluent.kafka.serializers.KafkaAvroDeserializer", value_deserializer_class=>"io.confluent.kafka.serializers.KafkaAvroDeserializer", id=>"287ef03accc48f86451447c44a252069bdeba58fa022168edc6c9a28d4b94c3d", enable_metric=>true, auto_commit_interval_ms=>"5000", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>"true", poll_timeout_ms=>100, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
Error: uncaught throw org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.KafkaAvroDeserializer for configuration key.deserializer: Class io.confluent.kafka.serializers.KafkaAvroDeserializer could not be found.
Exception: UncaughtThrowError
Stack: org/jruby/RubyKernel.java:1137:in `throw'
/apps/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:329:in `create_consumer'
/apps/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:224:in `block in run'
org/jruby/RubyFixnum.java:305:in `times'
org/jruby/RubyEnumerator.java:323:in `each'
org/jruby/RubyEnumerable.java:830:in `map'
/apps/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:224:in `run'
/apps/GA6/logstash-6.0.0/logstash-core/lib/logstash/pipeline.rb:574:in `inputworker'
/apps/GA6/logstash-6.0.0/logstash-core/lib/logstash/pipeline.rb:567:in `block in start_input'
Accordin to this, the deserializer class is not recognized by Logstash. I have the jar file kafka-avro-serializer-3.3.0.jar , which contains the class, and I have placed it in /apps/GA6/logstash-6.0.0/lib
. Still, why is this happening? How can I fix this?
Thank you.