Hi everyone,
I am using ELK GA 6.0.0. I have avro records in my Kafka topic and I am trying to push them into elasticsearch using Logstash Kafka input plugin using Avro Codec. Below is my configuration, its pretty simple and minimal;
input {
kafka{
group_id => "group_1"
topics => ["topic_1"]
bootstrap_servers => "192.168.0.1:9092"
codec => avro {
schema_uri => "/files/GA6/logstash-6.0.0/CONFIG_HOME/myschema.avsc"
}
}
}
output{
stdout{
}
}
What I am getting is the below error, when I am trying to start Logstash (manual line break applied);
[2018-01-25T11:54:37,060][FATAL][logstash.runner ] An unexpected error occurred!
{:error=>#<ArgumentError: negative length -15 given>, :backtrace=>[
"org/jruby/ext/stringio/StringIO.java:788:in `read'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:in `read'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:in `read_bytes'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:in `read_string'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:in `read_data'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:in `block in read_record'",
"org/jruby/RubyArray.java:1734:in `each'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:in `read_record'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:in `read_data'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:in `read'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:254:in `block in thread_runner'",
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in thread_runner'"
]}
Below is my avsc;
{
"type": "record",
"name": "Sample",
"doc": "Sample Schema",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "address",
"type": "string"
}, {
"name": "salary",
"type": "long"
}
]
}
I have referred this question and added the below config to my input;
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
Still, the issues persists. Why is this happening? How can I fix this?
Thank you.