Kafka input unable to create consumer after setting session timeout

I'm trying to setup logstash (5.0.0-beta1) to process events from a Kafka (0.10.0.0) topic and index them into a fairly busy Elasticsearch cluster (we've been using Logstash and Elasticsearch for a while, Kafka is the new part).

    kafka {
            bootstrap_servers => "myhost:9092"
            topics => ["test"]
            group_id => "test-1"
            enable_auto_commit => true
            codec => "json"
    }

It basically works, but sometimes (presumably when it takes a while to get the events into elasticsearch) I get a series of warnings like:

[2016-09-30T14:53:46,102][WARN ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Auto offset commit failed for group test-1: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2016-09-30T14:53:46,102][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Revoking previously assigned partitions [test-0] for group test-1
[2016-09-30T14:53:46,102][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group test-1

And events seem to stop flowing (I suspect it keeps reverting to the current offset). I then try setting the session timeout in the Logstash Kafka input config:

            session_timeout_ms => "60000"

and then I get:

[2016-09-28T18:21:41,649][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.
kafka.common.KafkaException: Failed to construct kafka consumer}

This seems like an issue with the kafka input, but I wanted to make sure I didn't miss something. Also, if anyone familiar with the kafka input can suggest some settings to tune?

Thanks,
Kevin

+1, it's happened on our env too (logstash-5.0.2-1)

Kavin,

Have you solved the issue? I'm having the same issue, and changing timeout/poll size doesn't help much. Here is my logstash config from the log:

[2017-02-02T12:29:20,239][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [sd-0290-5067:9292, sd-5ab8-74bb:9292, sd-61ef-40af:9292, sd-6a71-7086:9292, sd-8e71-dba0:9292]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = logstash
ssl.endpoint.identification.algorithm = null
max.poll.records = 10000
check.crcs = true
request.timeout.ms = 100000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = logstash
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

Sorry I didn't update this; I found an issue that solved the problem I was having: session_timeout_ms must be < request_timeout_ms, which defaults to 40s (https://github.com/logstash-plugins/logstash-input-kafka/issues/114).

rq2016: looks like you're having a different problem, sorry.

1 Like