Hi guys. There had meet a problem when use Avro framework serialize codec for output message to Kafka
. The Logstash
configure file setting like below:
input {
file {
path => "/data/tmp/*.log"
start_position => beginning
}
}
filter {
}
output {
stdout {
codec => plain {
format => "%{message}"
}
}
kafka {
client_id => "logstash-test"
bootstrap_servers => "xxxx:9093,xxxx:9092"
topic_id => "logstash-test"
compression_type => "snappy"
retries => 5
message_key => "logstash-test"
codec => avro {
schema_uri => "/root/User.avsc"
}
}
}
But there doesn't output any info when increase the tmp folder log files, some error message alert:
[2017-02-06T14:50:28,340][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<Avro::IO::AvroTypeError: The datum {"path"=>"/data/tmp/logstash-hdfs.log", "@timestamp"=>2017-02-06T06:50:27.923Z, "@version"=>"1", "host"=>"master01", "message"=>"{'name': 'Users3', 'age': 28, 'sex': 'M', 'supp': {'cn': 'Chinese Language', 'cid': '1024'}}!!--0", "tags"=>["_jsonparsefailure"]} is not an example of schema {"type":"record","name":"User","namespace":"io.github.elkan1788.alltest.avro","fields":[{"name":"name","type":"string"},{"name":"age","type":["int","null"]},{"name":"sex","type":["string","null"]},{"name":"supp","type":[{"type":"map","values":"string"}]}]}>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:543:in `write_data'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:538:in `write'", "/usr/share/logstash/plugin/logstash-codec-avro/lib/logstash/codecs/avro.rb:81:in `encode'", "/usr/share/logstash/plugin/logstash-output-kafka/lib/logstash/outputs/kafka.rb:201:in `receive'", "/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:92:in `multi_receive'", "org/jruby/RubyArray.java:1613:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:92:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:12:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:42:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:331:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:330:in `output_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:288:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:258:in `start_workers'"]}
Then I try to change the Kafka
output codec setting like this:
# This setting can worked fine
codec => plain {
format => "%{message}"
}
So there can receive message from Logstash
, I use the Kafka
console shell output those message on real time. I think if there Avro
codec can support format maybe can resolved this, actually it can't support this. So if I want to use the Avro
framework, what should I do?
Note:
Logstash5.2
Kafka_0.10.1.1
Logstash-output-kafka-6.1.3