In my Logstash config I want to read in a syslog, parse the message text to get some field values, then assemble those fields/values into a JSON string which I then output to a kafka topic. I must be missing something, because no matter what I do with my filter and output, all it ever sends to kafka is my original message string. logstash.conf source is shown below. Can anyone tell me what I need to do to send the JSON string to kafka instead of the original message?
input {
syslog {
port => 9601
}
}
filter {
metrics {
meter => "events"
add_tag => "metric"
}
if ! ( "metric" in [tags] ) {
grok {
match => {
"message" => [
"(?<eventTime>\|FIELD1\|.*\|)",
"(?<localIpAddress>\|FIELD2\|.*\|)"
]
}
}
json_encode {
source => "eventTime"
add_field => {
# "eventTime" => "%{eventTime}"
"localIpAddress" => "%{localIpAddress}"
}
}
}
}
output {
# stdout { codec => line }
# only emit events with the 'metric' tag
if "metric" in [tags] {
stdout {
codec => line {
format => "events/second: %{[events][rate_1m]}"
}
}
} else {
kafka {
codec => json_lines
bootstrap_servers => (redacted)
sasl_jaas_config => (redacted)
topic_id => (redacted)
security_protocol => (redacted)
sasl_mechanism => (redacted)
}
}
}