NSK  
                (NSK)
               
                 
              
                  
                    July 26, 2019,  9:13pm
                   
                   
              1 
               
             
            
              Hi - I am trying to consume a topic from Kafka using Avro Deserializer in Logstash and getting the below error.
[ERROR][logstash.inputs.kafka    ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.} 
[2019-07-26T16:58:22,736][ERROR][logstash.javapipeline    ] A plugin had an unrecoverable error. Will restart this plugin. 
Pipeline_id:main
I have provided avro_uri in codec however, the settings is not been read by the logstash.
Here is my Logstash Config file 
input { 
kafka { 
bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3:9911" 
topics => "Elastic_new" 
#max_poll_records  => "256" 
auto_offset_reset => earliest 
group_id => "logstash104" 
ssl_truststore_location =>"/elasticsearch/logstash7.1.1/kafka_files/kafka.client.truststore.jks" 
ssl_truststore_password => "abcdef" 
security_protocol => "SSL" 
#consumer_threads  => 10 
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer" 
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer" 
#schema_registry_url  => "https://kafka:9990 " 
codec => avro { 
schema_uri => "/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc" 
tag_on_failure => true 
#register_schema  => true 
#base64_encoding  => false 
} 
} 
}
output { 
elasticsearch { 
index => "topic_es2" 
document_id => "%{tktnum}%" 
action => "update" 
doc_as_upsert => "true" 
retry_on_conflict => 5 
hosts => ["npes1:9200"] 
} 
stdout { codec => rubydebug } 
}
             
            
               
               
               
            
                
            
           
          
            
              
                NSK  
                (NSK)
               
              
                  
                    July 27, 2019,  2:08pm
                   
                   
              2 
               
             
            
              Can someone please help with the issue?
             
            
               
               
               
            
            
           
          
            
              
                NSK  
                (NSK)
               
              
                  
                    July 29, 2019, 11:50pm
                   
                   
              3 
               
             
            
              Hi Everyone, 
Here are the jars that i added in logstash to consume data from kafka.
kafka-avro-serializer-5.0.0.jar 
kafka-clients-2.0.0.jar 
common-config-5.0.0.jar 
common-utils-5.0.0.jar 
kafka-schema-registry-client-5.0.0.jar 
avro-1.8.2.jar 
avro-maven-plugin-1.8.2.jar 
avro-compiler-1.8.2.jar 
jackson-mapper-asl-1.9.13.jar 
jackson-core-asl-1.9.13.jar
Is there an issue with the plugins that am using to consume data from Kafka using avro deserializer?
             
            
               
               
               
            
            
           
          
            
            
              I don't think you can use both value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer" and the avro codec. Decoding twice?
As we don't have a setting that passes a value to the schema.registry.url property of the Java Kafka Client you will have to use the avro codec only.
             
            
               
               
              1 Like 
            
            
           
          
            
              
                NSK  
                (NSK)
               
              
                  
                    July 30, 2019,  3:58pm
                   
                   
              5 
               
             
            
              Thanks for the response. I have removed the deserializer class from the conf file as stated: 
input { 
kafka { 
bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3:9911" 
topics => "Elastic_new" 
#max_poll_records  => "256" 
auto_offset_reset => earliest 
group_id => "logstash104" 
ssl_truststore_location =>"/elasticsearch/logstash7.1.1/kafka_files/kafka.client.truststore.jks" 
ssl_truststore_password => "abcdef" 
security_protocol => "SSL" 
codec => avro { 
schema_uri => "/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc" 
tag_on_failure => true 
} 
} 
}
This means the conf will be using the default string deserializer.
When i execute the script, got below output.
{ 
"ticketObjects" =>  , 
"@version " => "1", 
"@timestamp " => 2019-07-30T15:51:07.676Z 
}
             
            
               
               
              1 Like 
            
            
           
          
            
            
              See this StackOverflow answer 
You need to use the ByteArraySerializer in Kafka to preserve the binary data.
             
            
               
               
              1 Like 
            
            
           
          
            
              
                system  
                (system)
                  Closed 
               
              
                  
                    August 27, 2019,  4:28pm
                   
                   
              7 
               
             
            
              This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.