Key and value of the topic messages consumed by the Kafka input plugin are prefixed with the message ids, which causes the json parser to fail:
Here an example from the logstash log with decorate_events => "extended"
produced by the rubydebug
codec:
{
"@timestamp" => 2023-07-17T12:47:20.774284604Z,
"@metadata" => {
"kafka" => {
"topic" => "private.cdc.xxx-archive.01.Event_zu_Werk",
"key" => "\u0000\u0000\u0000\u0000\u000F{\"_type\":\"Event_zu_Werk\",\"_domain\":\"FM_XXX-Archive\",\"Event_ID\":7216,\"Werk_ID\":6279}",
"timestamp" => 1689423775741,
"offset" => 21628540,
"consumer_group" => "logstash",
"partition" => 0
}
},
"@version" => "1",
"message" => "\u0000\u0000\u0000\u0000\u0010{\"Werk_ID\":6279,\"Relation_Erstellung_Zeitstempel\":1649936217000,\"Event_ID\":7216,\"Relation_Dauer_Ende\":null,\"Relation_Dauer_Beginn\":null,\"Relation_Werk_Event\":\"exhibited at\",\"_type\":\"Event_zu_Werk\",\"_domain\":\"FM_XXX-Archive\"}",
"tags" => [
[0] "_jsonparsefailure"
]
}
The input configuration I am using:
input {
kafka {
bootstrap_servers => "broker-1:9092"
topics => [ "private.cdc.xxx-archive.01.Event_zu_Werk" ]
security_protocol => "SSL"
ssl_keystore_type => "jks"
ssl_truststore_type => "jks"
ssl_keystore_location => "/client_certs/client.keystore.jks"
ssl_truststore_location => "/client_certs/client.truststore.jks"
ssl_key_password => "*******************"
ssl_keystore_password => ""*******************""
ssl_truststore_password => ""*******************""
auto_offset_reset => "earliest"
decorate_events => "extended"
codec => json {
charset => "UTF-8" # is default
}
}
I am running logstash version 8.8.2 and confluent kafka version 7.3.4, so as far as I know these should be compatible.
What am I missing to make json parsing the Kafka input plugin working?
Thanks for any helpful hints.
Andreas