Kafka INVALID_FETCH_SESSION_EPOCH

It appears this has been asked more than once and never answered... anyone from @elastic @logstash able to help?

{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1561500837923,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=868661509, epoch=10): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1561500849251,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=240341971, epoch=11): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1561500876421,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=321926453, epoch=41): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1561500882513,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=543232444, epoch=3): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561501279416,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}} 

I know my log retention is 3 days so I shouldn't be having issue that the message has been deleted.

The consumer is setup as follows:

input {
  kafka {
    bootstrap_servers => "broker1:9094,broker2:9094,broker3:9094"
    topics_pattern => "(nonprod|wip|dev|qa)-logs-.*"
    metadata_max_age_ms => 60000
    group_id => "devgilogs"
    client_id => "rndlogstash1"
    auto_offset_reset => earliest
    decorate_events => true
    security_protocol => SSL
    ssl_truststore_location => "/etc/pki/java/cacerts"
  }
}

Updated with the following settings to the input:

session_timeout_ms => "10000"
max_poll_records => "550"
max_poll_interval_ms => "300000"
fetch_min_bytes => "1"
request_timeout_ms => "305000"
consumer_threads => 4

Doesn't appear to have helped:

{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505389157,"thread":"Ruby-0-Thread-67: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Setting newly assigned partitions [...]"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505389156,"thread":"Ruby-0-Thread-66: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Setting newly assigned partitions [...]"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505389157,"thread":"Ruby-0-Thread-68: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Setting newly assigned partitions [...]"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1561505406985,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=311148438, epoch=16): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505743844,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505743858,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Sending LeaveGroup request to coordinator broker3:9094 (id: 2147483644 rack: null)"}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744149,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744150,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Sending LeaveGroup request to coordinator broker3:9094 (id: 2147483644 rack: null)"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744336,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] Attempt to heartbeat failed since group is rebalancing"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505744349,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] Revoking previously assigned partitions []"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744349,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] (Re-)joining group"}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744542,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744542,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Sending LeaveGroup request to coordinator broker3:9094 (id: 2147483644 rack: null)"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505744572,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] Setting newly assigned partitions [...]"}}

Any suggestions to fix this?

» curl -s localhost:8080/v3/kafka/gw/consumer/devgilogs/lag | jq '.status.partitions[].status' | sort | uniq -c
  21 "OK"
  24 "STOP"

Still having these issues. Any suggestions?

input {
  kafka {
    bootstrap_servers => "broker1:9094,broker2:9094,broker3:9094"
    topics_pattern => "(nonprod|wip|dev|qa)-logs-.*"
    metadata_max_age_ms => 60000
    group_id => "devgilogs"
    client_id => "rndlogstash1"
    auto_offset_reset => earliest
    decorate_events => true
    security_protocol => SSL
    ssl_truststore_location => "/etc/pki/java/cacerts"
    session_timeout_ms => "10000"
    max_poll_records => "350"
    max_poll_interval_ms => "300000"
    fetch_min_bytes => "1"
    request_timeout_ms => "305000"
    consumer_threads => 4
  }
}

Any ideas why it's still logging out after now what appears to be more like 24-48 hours rather than few hours.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.