How can use Avro serialize codec when output message to Kafka?

OK, Just resolved this problem by myself. There show some related config FYI. Thanks.

input {
    file {
        path => "/data/tmp/*.log"
        start_position => beginning
        codec => "json"
    }
}

filter {

    json {
        remove_field => [ "path","@timestamp","@version","host" ]
        source => message
    }
}

output {
    stdout {}

    kafka {
#        acks => 0
        client_id => "logstash-test"
        bootstrap_servers => "xxxx:9093,xxxx:9092"
        topic_id => "logstash-test"
        compression_type => "snappy"
        retries => 5
        codec => avro { 
            schema_uri => "/root/User.avsc" 
        }
    }
}
1 Like