Version: 2.4.1
Operating System: ubuntu16.04
input:
kafka{
zk_connect => "10.19.176.68:2181"
topic_id => "lion_request-log"
group_id => "logstash"
codec => plain
reset_beginning => false
consumer_threads => 16
decorate_events => false
add_field => {
"es_one" => 1
"json_parse" => 1
}
}
filter:
if [json_parse] == "1" {
mutate {
lowercase => "message"
}
json {
source => "message"
target => "@parsed"
}
if "_jsonparsefailure" in [tags] {
mutate{
remove_tag => ["_jsonparsefailure"]
}
}
mutate {
rename => {
"@parsed[date_time]" => "date_time"
"@parsed[level]" => "level"
"@parsed[unique_id]" => "unique_id"
"@parsed[primary]" => "primary"
"@parsed[type]" => "type"
"@parsed[info]" => "info"
}
add_field => {
"hostname" => "%{host}"
}
}
}
output:
if [es_one] == "1" {
elasticsearch {
hosts => ["10.19.22.142:9200"]
sniffing => true
manage_template => false
flush_size => 15000
idle_flush_time => 10
index=> "%{[@parsed][@index]}"
document_type=> "%{[@parsed][@type]}"
}
}