Kafka input not working - no messages processed but offsets are updated to end


#1

I'm running logstash 6.6.0 with kafka input in a docker image. However it is not processing any events while the consumer group offsets are updated to the latest in kafka. Even when I manually reset the offsets to the beginning.

I've tried multiple configurations and tried to find answers online. But there was nothing that could solve the issues I'm experiencing.

Please see logging and configuration below. As you can see the offset for the consumer starts at 0. But after launching logstash there is no more lag. The stdout output didn't show any of the messages unfortunately. Any help is most welcome!

Consumer groups before launching Logstash

TOPIC                           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
resource-infrastructure-staging 2          0               81              81              -               -               -
resource-infrastructure-staging 0          0               63              63              -               -               -
resource-infrastructure-staging 1          0               63              63              -               -               -

Latest version of logstash config

input {
  kafka {
    type => "eventdata"
    id => "kafka-2"
    bootstrap_servers => "<host>:9092"
    client_id => "logstash"
    group_id => "logstash"
    auto_offset_reset => "earliest"
    enable_auto_commit => "false"
    topics => ["resource-infrastructure-staging"]
    codec => "json"
    consumer_threads => 1
    max_poll_records => "10"
  }
}

filter { }

output {
if [type] == "eventdata" {
    stdout { }
    elasticsearch {
      hosts => [ "<host>:9200" ]
      index => "events-%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][consumer_group]}-%{+YYYY.MM.dd}"
    }
  }
}

Consumer groups after launching logstash

TOPIC                           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
resource-infrastructure-staging 0          63              63              0               logstash-0-4ede6b58-950b-4dcf-9587-a8937e55b8cf /<host> logstash-0
resource-infrastructure-staging 1          63              63              0               logstash-0-4ede6b58-950b-4dcf-9587-a8937e55b8cf /<host> logstash-0
resource-infrastructure-staging 2          81              81              0               logstash-0-4ede6b58-950b-4dcf-9587-a8937e55b8cf /<host>  logstash-0

All Logstash Kafka logging

[2019-02-12T16:58:46,713][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
[2019-02-12T16:58:46,713][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
[2019-02-12T16:58:47,381][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: W-4kqDnHRI6-jvwlXuRmjA
[2019-02-12T16:58:47,385][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator 10.11.33.203:9092 (id: 2147483646 rack: null)
[2019-02-12T16:58:47,396][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Revoking previously assigned partitions []
[2019-02-12T16:58:47,400][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] (Re-)joining group
[2019-02-12T16:58:50,472][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Successfully joined group with generation 23
[2019-02-12T16:58:50,475][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [resource-infrastructure-staging-0, resource-infrastructure-staging-1, resource-infrastructure-staging-2]

#2

If you make the stdout {} output unconditional, what does an event look like?


#3

Ah, a good night sleep and your comment helped a lot :slight_smile:
my data already had a type field in json, so the if indeed didn't work. Thanks!


(system) closed #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.