Reading from specific Kafka partitions


I'm trying to consume Kafka data hosted on an on-prem Kafka bus using Logstash in a cloud-hosted environment. There is a single massive topic with about 100 partitions, the messages I need are all located in a single partition. Is there any way to have my Kafka input plugin filter the messages 'at the source', i.e. make sure only the data for that partition gets transferred in order to reduce bandwidth consumption? For now I'm doing the filtering like below (simplified for readability) by filtering on a messagekey, but it is my understanding that this will transfer all data and then filter in my cloud environment.

input {
  kafka {
    auto_offset_reset => "${kafka_auto_offset_reset}"           # where to start if this consumer group never ran before
    bootstrap_servers => "${kafka_bootstrap_servers}"           # kafka bootstrap servers
    decorate_events => "${decorate_events}"                     # append kafka metadata to the output. If false, disable the associated mutator in the filter section
    group_id => "${kafka_group_id}"                             # identifier of this listener group. Used to register and remember offsets in case of crashes
    security_protocol => "SSL"
    ssl_keystore_location => "/usr/share/logstash/auth/keystore.jks"
    ssl_keystore_password => "${ssl_keystore_password}"
    ssl_keystore_type => "JKS"
    ssl_truststore_location => "/usr/share/logstash/config/cacerts"
    ssl_truststore_password => "${ssl_truststore_password}"
    ssl_truststore_type => "JKS"
    topics => "${kafka_topic}"                                  # kafka topic to read

filter {
  mutate {
    copy => { "[@metadata][kafka]" => "kafka" }
    add_field => {"[@metadata][messagekey_filter]" => "${kafka_messagekey}"}

output {
  if [@metadata][kafka][key] == [@metadata][messagekey_filter] {
  elasticsearch {
    hosts => ["${elasticsearch_host}"]
    index => "${elasticsearch_index}"
    user => "${elasticsearch_user}"
    password => "${elasticsearch_password}"
    ssl => true
    cacert => "mycert.pem"
    ilm_enabled => "false"
    manage_template => "false"

It seems very odd (bordering on wrong) that you should care to be reading only from a single partition within a topic; but I can imagine why you might want to do this (and sympathise). If you are wanting to tackle scaling issues you should be looking at the notion of consumer groups, which will auto-balance partitions around the available consumers in the group.

While you could filter on [@metadata][kafka][partition], you are correct that it will still transfer the data in the input.

Within a consumer group, consumers don't even get to say which partitions they get assigned, but a consumer can read from an explicit partition; an example using kakfa console consumer looks like the following:

kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \
 --property print.key=true \
 --property key.separator="-" \
 --partition 1

Thus, this functionality does exist, but looking at various Kafka clients you would appear to be wanting to find an 'assign' method; this is not something that manifests as a consumer property.

I suppose then, that there are a few ways I might look to solve this problem myself:

  1. first I would reconsider if this problem is being approached the most natural way in Kafka (ie, using consumer groups) [but its not unlikely you have a particular need, so I'll not judge]
  2. alter the logstash-input-kafka module (or make a new one) that adds the option you need. The code you will need to modify would appear to be kafka.rb L246-L252.
  3. create a little tool (even just kafka-console-consumer, or kafka-cat) that gets the data you want, and you read this into logstash using logstash-input-pipe

Hope that helps,

Hi Cameron,

Thank you for the very thorough response and apologies for the late response. I agree that the single-partition approach doesn't seem like the most natural set-up but unfortunately I don't have control over that part of our environment. I didn't know about the logstash-input-pipe, it sounds like exactly what I need for my usecase in conjunction with a custom kafka-console-consumer command. I'll give it a try tomorrow!

Kind regards,

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.