Kafka integration plug v11.3.2 with AWS MSK 2.8.1

We have a TLS-enabled AWS MSK (Managed Streaming Kafka) 2.8.1 cluster and using logstash kafka integration plug v11.3.2 to read from topics. Our input logstash pipeline:

input {
    kafka {
        id => "sentinel_one-input_ulcmsk_800123458"
        group_id => "BBGROUP01"
        bootstrap_servers => "${BOOTSTRAP_SERVER}"
        consumer_threads => 5
        security_protocol => "SSL"
        ssl_truststore_location => "${msk_mtls_authentication_config_path}/kafka.client.truststore.jks"
        ssl_truststore_password => "********"
        ssl_truststore_type => "jks"
        ssl_keystore_location => "${msk_mtls_authentication_config_path}/kafka.client.keystore.jks"
        ssl_keystore_password => "${msk_ssl_keystore_password}"
        ssl_keystore_type => "jks"
        ssl_key_password => "${msk_ssl_key_password}"
        decorate_events => "extended"
        auto_offset_reset => "earliest"
        topics => ["800123458"]
        metadata_max_age_ms => 60000
        codec => line

The problem: Running with an output stdout produces no output but the offsets for topic 800123458 group BBGROUP01 show CURRENT-OFFSET=LOG-END-OFFSET and no lag. In other words, the input has read from the topic and is current, but no output is produced. We can see in our logstash log that the consumer offsets are getting updated:
Sending OFFSET_COMMIT ... committedOffset=nn ...
A note in https://github.com/logstash-plugins/logstash-integration-kafka/blob/main/lib/logstash/inputs/kafka.rb states "We recommended that you use matching Kafka client and broker versions."; but I can't find a way to specify a (logstash) client version; Kafka input plugin | Logstash Reference [8.11] | Elastic states: "This plugin uses Kafka Client 3.4". Any suggestions for resolving?

Update: Through trial and error, I commented out the "codec => line" in the kafka input and everything works now. Does this make any sense to anyone?

Yes. The default codec for a kafka input is plain. This is used for inputs where the transport (kafka, mq, redis, etc.) has its own framing protocol. The input is handed an entire event, and decodes it. The line input will collect data from the input waiting for a \n before flushing the collected data. If none of your events have an embedded \n it will just collect data until the JVM runs out of memory.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.