We have a TLS-enabled AWS MSK (Managed Streaming Kafka) 2.8.1 cluster and using logstash kafka integration plug v11.3.2 to read from topics. Our input logstash pipeline:
input {
kafka {
id => "sentinel_one-input_ulcmsk_800123458"
group_id => "BBGROUP01"
bootstrap_servers => "${BOOTSTRAP_SERVER}"
consumer_threads => 5
security_protocol => "SSL"
ssl_truststore_location => "${msk_mtls_authentication_config_path}/kafka.client.truststore.jks"
ssl_truststore_password => "********"
ssl_truststore_type => "jks"
ssl_keystore_location => "${msk_mtls_authentication_config_path}/kafka.client.keystore.jks"
ssl_keystore_password => "${msk_ssl_keystore_password}"
ssl_keystore_type => "jks"
ssl_key_password => "${msk_ssl_key_password}"
decorate_events => "extended"
auto_offset_reset => "earliest"
topics => ["800123458"]
metadata_max_age_ms => 60000
codec => line
}
}
The problem: Running with an output stdout produces no output but the offsets for topic 800123458 group BBGROUP01 show CURRENT-OFFSET=LOG-END-OFFSET and no lag. In other words, the input has read from the topic and is current, but no output is produced. We can see in our logstash log that the consumer offsets are getting updated:
Sending OFFSET_COMMIT ... committedOffset=nn ...
A note in https://github.com/logstash-plugins/logstash-integration-kafka/blob/main/lib/logstash/inputs/kafka.rb states "We recommended that you use matching Kafka client and broker versions."; but I can't find a way to specify a (logstash) client version; Kafka input plugin | Logstash Reference [8.11] | Elastic states: "This plugin uses Kafka Client 3.4". Any suggestions for resolving?