How to grok then send to kafka as JSON text?

In my Logstash config I want to read in a syslog, parse the message text to get some field values, then assemble those fields/values into a JSON string which I then output to a kafka topic. I must be missing something, because no matter what I do with my filter and output, all it ever sends to kafka is my original message string. logstash.conf source is shown below. Can anyone tell me what I need to do to send the JSON string to kafka instead of the original message?

input {
    syslog {
        port => 9601
    }
}

filter {
    metrics {
        meter => "events"
        add_tag => "metric"
    }
    if ! ( "metric" in [tags] ) {
        grok {
            match => {
                "message" => [
                    "(?<eventTime>\|FIELD1\|.*\|)",
                    "(?<localIpAddress>\|FIELD2\|.*\|)"
                ]
            }
        }
        json_encode {
            source => "eventTime"
            add_field => {
                # "eventTime" => "%{eventTime}"
                "localIpAddress" => "%{localIpAddress}"
            }
        }
    }
}

output {
    # stdout { codec => line }

    # only emit events with the 'metric' tag
    if "metric" in [tags] {
        stdout {
          codec => line {
              format => "events/second: %{[events][rate_1m]}"
          }
        }
    } else {
        kafka {
            codec => json_lines

            bootstrap_servers => (redacted)
            sasl_jaas_config => (redacted)
            topic_id => (redacted)
            security_protocol => (redacted)
            sasl_mechanism => (redacted)
        }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.