Logstash Kafka Avro integration failing

Hi - I am trying to consume a topic from Kafka using Avro Deserializer in Logstash and getting the below error.

[ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.}
[2019-07-26T16:58:22,736][ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main

I have provided avro_uri in codec however, the settings is not been read by the logstash.

Here is my Logstash Config file
input {
kafka {
bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3:9911"
topics => "Elastic_new"
#max_poll_records => "256"
auto_offset_reset => earliest
group_id => "logstash104"
ssl_truststore_location =>"/elasticsearch/logstash7.1.1/kafka_files/kafka.client.truststore.jks"
ssl_truststore_password => "abcdef"
security_protocol => "SSL"
#consumer_threads => 10
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
#schema_registry_url => "https://kafka:9990"
codec => avro {
schema_uri => "/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc"
tag_on_failure => true
#register_schema => true
#base64_encoding => false
}
}
}

output {
elasticsearch {
index => "topic_es2"
document_id => "%{tktnum}%"
action => "update"
doc_as_upsert => "true"
retry_on_conflict => 5
hosts => ["npes1:9200"]
}
stdout { codec => rubydebug }
}

Can someone please help with the issue?

Hi Everyone,
Here are the jars that i added in logstash to consume data from kafka.

kafka-avro-serializer-5.0.0.jar
kafka-clients-2.0.0.jar
common-config-5.0.0.jar
common-utils-5.0.0.jar
kafka-schema-registry-client-5.0.0.jar
avro-1.8.2.jar
avro-maven-plugin-1.8.2.jar
avro-compiler-1.8.2.jar
jackson-mapper-asl-1.9.13.jar
jackson-core-asl-1.9.13.jar

Is there an issue with the plugins that am using to consume data from Kafka using avro deserializer?

I don't think you can use both value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer" and the avro codec. Decoding twice?

As we don't have a setting that passes a value to the schema.registry.url property of the Java Kafka Client you will have to use the avro codec only.

Thanks for the response. I have removed the deserializer class from the conf file as stated:
input {
kafka {
bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3:9911"
topics => "Elastic_new"
#max_poll_records => "256"
auto_offset_reset => earliest
group_id => "logstash104"
ssl_truststore_location =>"/elasticsearch/logstash7.1.1/kafka_files/kafka.client.truststore.jks"
ssl_truststore_password => "abcdef"
security_protocol => "SSL"
codec => avro {
schema_uri => "/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc"
tag_on_failure => true
}
}
}

This means the conf will be using the default string deserializer.

When i execute the script, got below output.

{
"ticketObjects" => ,
"@version" => "1",
"@timestamp" => 2019-07-30T15:51:07.676Z
}

See this StackOverflow answer

You need to use the ByteArraySerializer in Kafka to preserve the binary data.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.