Kafka AvroDesrializer - Logstash

input {
kafka{
group_id => "group1"
topics => ["Topic1"]
bootstrap_servers => "192.168.0.1:9092"
codec => avro {
schema_uri => "=/logstash-6.6.0/avrotest.avsc"
}
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
}
}
output{
stdout{
codec => rubydebug
}
}

This config is not working. Has anyone made logstash work with Avro serialized kafka messages.

[2019-03-13T11:53:12,531][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.KafkaAvroDeserializer for configuration key.deserializer: Class io.confluent.kafka.serializers.KafkaAvroDeserializer could not be found., :cause=>nil}
[2019-03-13T11:53:12,532][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.

This is an example of our config:

value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => avro {schema_uri => "/etc/logstash/ancillary/avro/incident/<schemafile>.avsc"}

We don't need specific avro deserializer classes just need the ByteArrayDeserializer as it's stored in binary on the topic. All I care about is the value because the avro schema will define the key once in the pipeline.

note: might take a look at your avro schema path, I don't think you need that '=' character at the start

Thanks Chris.

Below is the error I'm receiving when i use only the avro codec in the input config without specifying any deserialization class
[2019-03-13T13:29:51,596][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<RangeError: integer -159386219032568 too small to convert to int'>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:785:inread'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:in read'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:inread_bytes'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:in read_string'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:inread_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:in block in read_record'", "org/jruby/RubyArray.java:1734:ineach'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:in read_record'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:inread_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:in read'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:indecode'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:258:in block in thread_runner'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:257:inblock in thread_runner'"]}
[2019-03-13T13:29:51,851][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit


Below is the error I'm receiving when i use the avro codec in the input config specifying "value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" as derserialization class
[2019-03-13T21:03:02,763][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<ArgumentError: negative length -594054 given>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:788:in read'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:inread'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:in read_bytes'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:inread_string'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:in read_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:inblock in read_record'", "org/jruby/RubyArray.java:1734:in each'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:inread_record'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:in read_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:inread'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in decode'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:258:inblock in thread_runner'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'"]}
[2019-03-13T21:03:02,879][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit


Background-

The kafka msgs Im trying to read are Avro serialized using the class "io.confluent.kafka.serializers.KafkaAvroSerializer". Is there any way I can use the confluent "io.confluent.kafka.serializers.KafkaAvroDeserializer" class in logstash. Has anyone implemented such a thing.

I am not certain that the logstash plugin will support that serialization: https://github.com/logstash-plugins/logstash-input-kafka/issues/37

If there is a way to use the standard binary serialization on the producer side things should work, but beyond that you are probably going to have to customize the LS plugin. Unless someone else in the community has some ideas...

Below codec worked for me. Im posting this here incase anyone has a similar use case.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.