How can use Avro serialize codec when output message to Kafka?


(Elkan1788) #1

Hi guys. There had meet a problem when use Avro framework serialize codec for output message to Kafka. The Logstash configure file setting like below:

input {
    file {
        path => "/data/tmp/*.log"
        start_position => beginning 
    }
}

filter {

}

output {
    stdout {
        codec => plain {
             format => "%{message}"
       }
    }

    kafka {
        client_id => "logstash-test"
        bootstrap_servers => "xxxx:9093,xxxx:9092"
        topic_id => "logstash-test"
        compression_type => "snappy"
        retries => 5
        message_key => "logstash-test"
        codec => avro { 
            schema_uri => "/root/User.avsc"
        }
    }
}

But there doesn't output any info when increase the tmp folder log files, some error message alert:

[2017-02-06T14:50:28,340][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<Avro::IO::AvroTypeError: The datum {"path"=>"/data/tmp/logstash-hdfs.log", "@timestamp"=>2017-02-06T06:50:27.923Z, "@version"=>"1", "host"=>"master01", "message"=>"{'name': 'Users3', 'age': 28, 'sex': 'M', 'supp': {'cn': 'Chinese Language', 'cid': '1024'}}!!--0", "tags"=>["_jsonparsefailure"]} is not an example of schema {"type":"record","name":"User","namespace":"io.github.elkan1788.alltest.avro","fields":[{"name":"name","type":"string"},{"name":"age","type":["int","null"]},{"name":"sex","type":["string","null"]},{"name":"supp","type":[{"type":"map","values":"string"}]}]}>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:543:in `write_data'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:538:in `write'", "/usr/share/logstash/plugin/logstash-codec-avro/lib/logstash/codecs/avro.rb:81:in `encode'", "/usr/share/logstash/plugin/logstash-output-kafka/lib/logstash/outputs/kafka.rb:201:in `receive'", "/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:92:in `multi_receive'", "org/jruby/RubyArray.java:1613:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:92:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:12:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:42:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:331:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:330:in `output_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:288:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:258:in `start_workers'"]}

Then I try to change the Kafka output codec setting like this:

# This setting can worked fine
codec => plain {
         format => "%{message}"
}

So there can receive message from Logstash, I use the Kafka console shell output those message on real time. I think if there Avro codec can support format maybe can resolved this, actually it can't support this. So if I want to use the Avro framework, what should I do?

Note:

Logstash5.2
Kafka_0.10.1.1
Logstash-output-kafka-6.1.3


(Elkan1788) #2

Also there have an other problem at the same time, how can I decode the Avro codec data, It's seems can't support in consumer side with storm scheme.

Someone had meet same problem, but not any truly answer.

not-able-to-read-kafka-avro-schema-messages


(Elkan1788) #3

OK, Just resolved this problem by myself. There show some related config FYI. Thanks.

input {
    file {
        path => "/data/tmp/*.log"
        start_position => beginning
        codec => "json"
    }
}

filter {

    json {
        remove_field => [ "path","@timestamp","@version","host" ]
        source => message
    }
}

output {
    stdout {}

    kafka {
#        acks => 0
        client_id => "logstash-test"
        bootstrap_servers => "xxxx:9093,xxxx:9092"
        topic_id => "logstash-test"
        compression_type => "snappy"
        retries => 5
        codec => avro { 
            schema_uri => "/root/User.avsc" 
        }
    }
}

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.