Hi, we hit a snag with the logstash kafka avro codec. We are unable to transmit/receive doubles across a topic with this codec. Included is a test case along with data to reproduce the problem. We're using Logstash 5.2.2 on rhel 7.2 talking to Kafka 0.10.1.1. We read a csv file, mutate to its corresponding type, then post to a kafka topic and then subsequently read it. Here is the transmitter conf file:
input {
file {
path => "/usr/share/logstash/avro-test/test-data.csv"
type => "csv-file-input"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
columns => [
"identifier","name","longitude","latitude"
]
separator => ","
}
mutate {
convert => { "identifier" => "string" }
convert => { "name" => "string" }
convert => { "longitude" => "float" }
convert => { "latitude" => "float" }
}
}
output {
stdout { codec => rubydebug }
kafka {
topic_id => "avro-test"
bootstrap_servers => "10.10.10.99:9092"
codec => avro {
schema_uri => "/usr/share/logstash/avro-test/schema.avsc"
}
}
}
The avro schema:
{"namespace": "com.systems.test.data",
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "identifier", "type": ["string", "null"]},
{"name": "name", "type": ["string", "null"]},
{"name": "longitude", "type": ["double", "null"]},
{"name": "latitude", "type": ["double", "null"]}
]
}
The receiver:
input {
kafka {
topics => "avro-test"
type => "kafka-input"
bootstrap_servers => "10.10.10.99:9092"
codec => avro {
schema_uri => "/usr/share/logstash/avro-test/schema.avsc"
}
}
}
output {
stdout { codec => rubydebug }
}
And the test data in test-data.csv:
t1,row1,111.0123,21.01234
t2,row2,112.0123,22.01234
t3,row3,113.0123,23.01234
Logstash terminates with the error:
[2017-03-17T07:39:17,776][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method
`type_sym' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:224:in
`match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:280:in `read_data'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:376:in `read_union'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:309:in `read_data'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:384:in `read_record'", "org/jruby/RubyArray.java:1613:in
`each'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:382:in `read_record'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:310:in `read_data'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:275:in `read'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-avro-3.1.0-java/lib/logstash/codecs/avro.rb:76:in `decode'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:241:in `thread_runner'",
"file:/usr/share/logstash/vendor/jruby/lib/jruby.jar!/jruby/java/java_ext/java.lang.rb:12:in `each'",
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:240:in `thread_runner'"]}
We also tried adding the following deserializers and got the same error results:
input {
kafka {
topics => "avro-test"
type => "kafka-input"
bootstrap_servers => "10.35.237.92:9092"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => avro {
schema_uri => "/usr/share/logstash/avro-test/schema.avsc"
}
}
}
Much thanks in advance for any assistance.