Logstash with Kafka: Unable to decode avro


#1

I am trying to consume serialized avro events from a Kafka Queue. The kafka queue is populated using a simple java producer. For clarity I am sharing the three components:

Avro schema file

{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}

Logstash Config file

input {
kafka {
zk_connect => "localhost:2181"
topic_id => "MemoryTest"
type => "standard_event"
group_id => "butiline_dash_prod"
reset_beginning => true
auto_offset_reset => smallest
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
}
}

output {
if [type] == "standard_event" {
stdout {
codec => rubydebug
}
}
}
Problem

The pipeline fails at logstash level. When a new event is pushed into Kafka, I get following on logstash console:

Alyssa�blue {:exception=> # < NoMethodError: undefined method decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:inqueue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:ininputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}


#2

Finally figured out the error. Instead of this (As suggested on Logstash website - https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)

codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}

The correct syntax is (as suggested in the plugin's documentation https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md):

codec => avro {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}

I guess the syntax is changed.


(system) #3