OK, Just resolved this problem by myself. There show some related config FYI. Thanks.
input {
file {
path => "/data/tmp/*.log"
start_position => beginning
codec => "json"
}
}
filter {
json {
remove_field => [ "path","@timestamp","@version","host" ]
source => message
}
}
output {
stdout {}
kafka {
# acks => 0
client_id => "logstash-test"
bootstrap_servers => "xxxx:9093,xxxx:9092"
topic_id => "logstash-test"
compression_type => "snappy"
retries => 5
codec => avro {
schema_uri => "/root/User.avsc"
}
}
}