Can't get Logstash to read existing Kafka topic from start

I'm trying to consume a Kafka topic using Logstash, for indexing by Elasticsearch. The Kafka events are JSON documents.

We recently upgraded our Elastic Stack to 5.1.2.

I believe that I was able to consume the topic OK in 5.0, using the same settings, but that was a while ago so perhaps I'm doing something wrong now, but can't see it. This is my config (slightly sanitized):

input {
    kafka {
        bootstrap_servers => "host1:9092,host2:9092,host3:9092"
        client_id => "logstash-elastic-5-c5"
        group_id => "logstash-elastic-5-g5"
        topics => "trp_v1"
        auto_offset_reset => "earliest"
        # enable_auto_commit => "false"
    }
}

filter {

    json {
        source => "message"
    }

    mutate {

        rename => { "@timestamp" => "indexedDatetime" }

        remove_field => [
            "@timestamp",
            "@version",
            "message"
        ]
    }
}

output {
    ##stdout { codec => plain }
    stdout { codec => rubydebug }

    elasticsearch {
        hosts => ["host10:9200", "host11:9200", "host12:9200", "host13:9200"]
        action => "index"
        index => "trp-i"
        document_type => "event"
    }
}

When I run this, no messages are consumed, no sign of activity appears in the log after "[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions", and in Kafka Manager the consumer appears to immediately appear with "total lag = 0" for the topic.

This version of the Kafka plugin stores consumer offsets in Kafka itself, so each time I try to run Logstash against the same topic, I increment the group_id so in theory, it should start from the earliest offset for the topic.

Any advice?

EDIT: It appears that despite setting auto_offset_reset to "earliest", it isn't working - it's as if it's being set to "latest". I left Logstash running, then had more entries loaded into the Kafka queue, and they were processed by Logstash. Are my settings wrong in some way? Or could this be a bug?

Hi,

Although I'm a long time user of Elasticsearch I am only now starting to try use logstash. Did you ever get an answer to this issue?

Sorry it looks like we missed this one.

See https://www.elastic.co/blog/just-enough-kafka-for-the-elastic-stack-part1
and https://www.elastic.co/blog/just-enough-kafka-for-the-elastic-stack-part2

but particularly the section called "Offset Management and Message Delivery Guarantees" says that zookeeper stores the last committed offset of a consumer. Meaning, I suppose that if you want to reply all you need to use some Kafka magic to rest the offset in ZK to the beginning.

@crickes, while we had upgraded to 5.1.2, it turned out that I was still using Logstash 5.0.0. That was upgraded and the apparent problem of it not consuming until new events were added to the topic seemed to be solved - though I'm not sure exactly how.

@guyboertje, thanks for the reply. We're using a version of Logstash where the Kafka input plugin stores offsets in Kafka rather than Zookeeper, so it appears you can't replay an entire queue without using a previously-unused consumer group ID?

I was still getting much less data from the topic than I was with Logstash 2.4; I then realised there were these errors:

[2017-02-22T00:00:26,372][WARN ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Auto offset commit failed for group logstash-elastic-512-g1: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

It appeared that it would get into a state where it would process a block of events, not be able to commit the offsets, and then would reprocess the same block of events, causing the total number of documents to stall (but the number of deleted documents to increase). I reduced max_poll_records to 1000 and it helped a lot, but I still get this error now, and even reducing max_poll_records to 1 doesn't eliminate the error. I don't understand how these low values would still cause a timeout against the 30s default timeout time.

I can also see these errors after the job has been running for a few days:

[2017-02-22T00:02:09,456][WARN ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Auto offset commit failed for group logstash-elastic-512-g1: Commit offsets failed with retriable exception. You should retry committing offsets.

[2017-02-22T16:13:51,134][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 429 ({"type"=>"es_rejected_execution_exception", "reason"=>"rejected execution of org.elasticsearch.transport.TransportService$6@1240229b on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@25fc9a2a[Running, pool size = 6, active threads = 6, queued tasks = 50, completed tasks = 2330613]]"})

[2017-02-22T16:13:51,155][ERROR][logstash.outputs.elasticsearch] Action

It's still indexing, but I'm concerned about missing events; it's quite difficult to reconcile what's in ES vs what's in the topic.

Any advice?

Thanks again!

@guyboertje @ankh Thanks. Soon after posting this I realised what was going on. The last time I used logstash (2 years ago), I think it was Logstash (or any other client) that was responsible for tracking where it was up to in the topic feed, but now that you can have multiple Logstash instances in a consumer groups spread around the place, they would need to use Kafka or Zookeepr to store the position. If a single logstash instance were to request a rewind for that consumer group, it would of course impact the other members of that group, which may not be what you intended to happen. So it looks like we will have to find someway to modify the group pointer in Zookeeper if we need the group to replay an entire topic.

@talevy, @suyograo

WDYT about these posts above?

The blog posts above are informative. I think what is lacking is a good solution for managing Kafka, not Elastic's problem. We have tried to use https://github.com/yahoo/kafka-manager but found it to be very unstable. It hasn't been updated in a while and not sure if it even currently supports the latest version of Kafka.

It might be helpful if the Logstash documentation for the Kafka input module, reference these posts, or at least provide a better explanation about how the offset is now managed.

There is a way for the input plugin to be modified to allow re-seeking to the earliest offset, but it is recommended that one does this by managing the group's offsets using Kafka's utilities

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.