Logstash Kafka Avro Codec Error with Doubles

Hi, we hit a snag with the logstash kafka avro codec. We are unable to transmit/receive doubles across a topic with this codec. Included is a test case along with data to reproduce the problem. We're using Logstash 5.2.2 on rhel 7.2 talking to Kafka 0.10.1.1. We read a csv file, mutate to its corresponding type, then post to a kafka topic and then subsequently read it. Here is the transmitter conf file:

input {
   file {
     path => "/usr/share/logstash/avro-test/test-data.csv"
     type => "csv-file-input"
     start_position => "beginning"
     sincedb_path => "/dev/null"
   }
}
filter {
   csv {
     columns => [
        "identifier","name","longitude","latitude"
     ]
     separator => ","
   }
   mutate {
     convert => { "identifier" => "string" }
     convert => { "name" => "string" }
     convert => { "longitude" => "float" }
     convert => { "latitude" => "float" }
   }
}
output {
   stdout { codec => rubydebug }

   kafka {
     topic_id => "avro-test"
     bootstrap_servers => "10.10.10.99:9092"
     codec => avro {
        schema_uri => "/usr/share/logstash/avro-test/schema.avsc"
     }
   }
}

The avro schema:

{"namespace": "com.systems.test.data",
 "type": "record",
 "name": "TestRecord",
 "fields": [
     {"name": "identifier", "type": ["string", "null"]},
     {"name": "name", "type": ["string", "null"]},
     {"name": "longitude", "type": ["double", "null"]},
     {"name": "latitude", "type": ["double", "null"]}
 ]
}

The receiver:

input {
  kafka {
    topics => "avro-test"
    type => "kafka-input"
    bootstrap_servers => "10.10.10.99:9092"
    codec => avro {
       schema_uri => "/usr/share/logstash/avro-test/schema.avsc"
    }
  }
}
output {
  stdout { codec => rubydebug }
}

And the test data in test-data.csv:

t1,row1,111.0123,21.01234
t2,row2,112.0123,22.01234
t3,row3,113.0123,23.01234

Logstash terminates with the error:

[2017-03-17T07:39:17,776][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method
 `type_sym' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:224:in
 `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:280:in `read_data'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:376:in `read_union'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:309:in `read_data'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:384:in `read_record'", "org/jruby/RubyArray.java:1613:in
 `each'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:382:in `read_record'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:310:in `read_data'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:275:in `read'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-avro-3.1.0-java/lib/logstash/codecs/avro.rb:76:in `decode'",
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:241:in `thread_runner'",
 "file:/usr/share/logstash/vendor/jruby/lib/jruby.jar!/jruby/java/java_ext/java.lang.rb:12:in `each'", 
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:240:in `thread_runner'"]}

We also tried adding the following deserializers and got the same error results:

input {
  kafka {
    topics => "avro-test"
    type => "kafka-input"
    bootstrap_servers => "10.35.237.92:9092"
    key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    codec => avro {
       schema_uri => "/usr/share/logstash/avro-test/schema.avsc"
    }
  }
}

Much thanks in advance for any assistance.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.

Thanks for the test data and conf. I was able to reproduce it.

I think this has to do with the fact that we use a string serializer in Kafka which messes up how Avro encodes doubles. We can enhance the Kafka plugins (input and output) and avro to have an option to directly use Java bytes.

So, if an user chooses to use org.apache.kafka.common.serialization.ByteArraySerializer, then we should change avro codec to use Bytearrays as well.

The issue is that both Stringserializer and ByteArraySerializer should be supported..