Kafka input plugin cannot parse key or value due to message keys

Key and value of the topic messages consumed by the Kafka input plugin are prefixed with the message ids, which causes the json parser to fail:

Here an example from the logstash log with decorate_events => "extended" produced by the rubydebug codec:

{
     "@timestamp" => 2023-07-17T12:47:20.774284604Z,
      "@metadata" => {
         "kafka" => {
                      "topic" => "private.cdc.xxx-archive.01.Event_zu_Werk",
                        "key" => "\u0000\u0000\u0000\u0000\u000F{\"_type\":\"Event_zu_Werk\",\"_domain\":\"FM_XXX-Archive\",\"Event_ID\":7216,\"Werk_ID\":6279}",
                  "timestamp" => 1689423775741,
                     "offset" => 21628540,
             "consumer_group" => "logstash",
                  "partition" => 0
         }
     },
       "@version" => "1",
        "message" => "\u0000\u0000\u0000\u0000\u0010{\"Werk_ID\":6279,\"Relation_Erstellung_Zeitstempel\":1649936217000,\"Event_ID\":7216,\"Relation_Dauer_Ende\":null,\"Relation_Dauer_Beginn\":null,\"Relation_Werk_Event\":\"exhibited at\",\"_type\":\"Event_zu_Werk\",\"_domain\":\"FM_XXX-Archive\"}",
           "tags" => [
         [0] "_jsonparsefailure"
     ]
 }

The input configuration I am using:

input {
  kafka {
    bootstrap_servers => "broker-1:9092"
    topics => [ "private.cdc.xxx-archive.01.Event_zu_Werk" ]
    security_protocol => "SSL"
    ssl_keystore_type => "jks"
    ssl_truststore_type => "jks"
    ssl_keystore_location => "/client_certs/client.keystore.jks"
    ssl_truststore_location => "/client_certs/client.truststore.jks"
    ssl_key_password => "*******************"
    ssl_keystore_password => ""*******************""
    ssl_truststore_password => ""*******************""
    auto_offset_reset => "earliest"
    decorate_events => "extended"

    codec => json {
      charset => "UTF-8" # is default
    }
  }

I am running logstash version 8.8.2 and confluent kafka version 7.3.4, so as far as I know these should be compatible.

What am I missing to make json parsing the Kafka input plugin working?

Thanks for any helpful hints.
Andreas

version 7.3.4 is EOL and no longer supported. Please upgrade ASAP.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

Version 7.3.4 in my message refers to kafka not to the EL Stack!!!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.