Logstash Kafka Avro integration failing

Hi - I am trying to consume a topic from Kafka using Avro Deserializer in Logstash and getting the below error.

[ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.}
[2019-07-26T16:58:22,736][ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main

I have provided avro_uri in codec however, the settings is not been read by the logstash.

Here is my Logstash Config file
input {
kafka {
bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3:9911"
topics => "Elastic_new"
#max_poll_records => "256"
auto_offset_reset => earliest
group_id => "logstash104"
ssl_truststore_location =>"/elasticsearch/logstash7.1.1/kafka_files/kafka.client.truststore.jks"
ssl_truststore_password => "abcdef"
security_protocol => "SSL"
#consumer_threads => 10
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
#schema_registry_url => "https://kafka:9990"
codec => avro {
schema_uri => "/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc"
tag_on_failure => true
#register_schema => true
#base64_encoding => false
}
}
}

output {
elasticsearch {
index => "topic_es2"
document_id => "%{tktnum}%"
action => "update"
doc_as_upsert => "true"
retry_on_conflict => 5
hosts => ["npes1:9200"]
}
stdout { codec => rubydebug }
}

Can someone please help with the issue?

Hi Everyone,
Here are the jars that i added in logstash to consume data from kafka.

kafka-avro-serializer-5.0.0.jar
kafka-clients-2.0.0.jar
common-config-5.0.0.jar
common-utils-5.0.0.jar
kafka-schema-registry-client-5.0.0.jar
avro-1.8.2.jar
avro-maven-plugin-1.8.2.jar
avro-compiler-1.8.2.jar
jackson-mapper-asl-1.9.13.jar
jackson-core-asl-1.9.13.jar

Is there an issue with the plugins that am using to consume data from Kafka using avro deserializer?

I don't think you can use both value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer" and the avro codec. Decoding twice?

As we don't have a setting that passes a value to the schema.registry.url property of the Java Kafka Client you will have to use the avro codec only.

1 Like

Thanks for the response. I have removed the deserializer class from the conf file as stated:
input {
kafka {
bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3:9911"
topics => "Elastic_new"
#max_poll_records => "256"
auto_offset_reset => earliest
group_id => "logstash104"
ssl_truststore_location =>"/elasticsearch/logstash7.1.1/kafka_files/kafka.client.truststore.jks"
ssl_truststore_password => "abcdef"
security_protocol => "SSL"
codec => avro {
schema_uri => "/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc"
tag_on_failure => true
}
}
}

This means the conf will be using the default string deserializer.

When i execute the script, got below output.

{
"ticketObjects" => ,
"@version" => "1",
"@timestamp" => 2019-07-30T15:51:07.676Z
}

1 Like

See this StackOverflow answer

You need to use the ByteArraySerializer in Kafka to preserve the binary data.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.