Updated with the following settings to the input:
session_timeout_ms => "10000"
max_poll_records => "550"
max_poll_interval_ms => "300000"
fetch_min_bytes => "1"
request_timeout_ms => "305000"
consumer_threads => 4
Doesn't appear to have helped:
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505389157,"thread":"Ruby-0-Thread-67: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Setting newly assigned partitions [...]"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505389156,"thread":"Ruby-0-Thread-66: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Setting newly assigned partitions [...]"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505389157,"thread":"Ruby-0-Thread-68: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Setting newly assigned partitions [...]"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1561505406985,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=311148438, epoch=16): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505743844,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505743858,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Sending LeaveGroup request to coordinator broker3:9094 (id: 2147483644 rack: null)"}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744149,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744150,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Sending LeaveGroup request to coordinator broker3:9094 (id: 2147483644 rack: null)"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744336,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] Attempt to heartbeat failed since group is rebalancing"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505744349,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] Revoking previously assigned partitions []"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744349,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] (Re-)joining group"}}
{"level":"WARN","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744542,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","timeMillis":1561505744542,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Sending LeaveGroup request to coordinator broker3:9094 (id: 2147483644 rack: null)"}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","timeMillis":1561505744572,"thread":"Ruby-0-Thread-69: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-3, groupId=devgilogs] Setting newly assigned partitions [...]"}}
Any suggestions to fix this?
» curl -s localhost:8080/v3/kafka/gw/consumer/devgilogs/lag | jq '.status.partitions[].status' | sort | uniq -c
21 "OK"
24 "STOP"