This config is not working. Has anyone made logstash work with Avro serialized kafka messages.
[2019-03-13T11:53:12,531][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.KafkaAvroDeserializer for configuration key.deserializer: Class io.confluent.kafka.serializers.KafkaAvroDeserializer could not be found., :cause=>nil}
[2019-03-13T11:53:12,532][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
We don't need specific avro deserializer classes just need the ByteArrayDeserializer as it's stored in binary on the topic. All I care about is the value because the avro schema will define the key once in the pipeline.
note: might take a look at your avro schema path, I don't think you need that '=' character at the start
Below is the error I'm receiving when i use only the avro codec in the input config without specifying any deserialization class
[2019-03-13T13:29:51,596][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<RangeError: integer -159386219032568 too small to convert to int'>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:785:inread'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:in read'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:inread_bytes'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:in read_string'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:inread_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:in block in read_record'", "org/jruby/RubyArray.java:1734:ineach'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:in read_record'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:inread_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:in read'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:indecode'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:258:in block in thread_runner'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:257:inblock in thread_runner'"]}
[2019-03-13T13:29:51,851][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
Below is the error I'm receiving when i use the avro codec in the input config specifying "value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" as derserialization class
[2019-03-13T21:03:02,763][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<ArgumentError: negative length -594054 given>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:788:in read'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:inread'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:in read_bytes'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:inread_string'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:in read_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:inblock in read_record'", "org/jruby/RubyArray.java:1734:in each'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:inread_record'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:in read_data'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:inread'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in decode'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:258:inblock in thread_runner'", "~/Documents/Elastic/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'"]}
[2019-03-13T21:03:02,879][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
Background-
The kafka msgs Im trying to read are Avro serialized using the class "io.confluent.kafka.serializers.KafkaAvroSerializer". Is there any way I can use the confluent "io.confluent.kafka.serializers.KafkaAvroDeserializer" class in logstash. Has anyone implemented such a thing.
If there is a way to use the standard binary serialization on the producer side things should work, but beyond that you are probably going to have to customize the LS plugin. Unless someone else in the community has some ideas...
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.