Hi all,
I am using a Logstash instance to mirror data between two Kafkas, below is the current configuration I have:
Logstash pipeline:
input { kafka { id => "kafka-reader" bootstrap_servers => "INPUT_KAFKA:9092" topics => ["TOPIC"] auto_offset_reset => "latest" group_id => "logstash-consumer" client_id => "iapp504-kafka_to_kafka_1-id1" max_poll_records => "1" max_poll_interval_ms => "200000" codec => "json" } }
output {
kafka {
bootstrap_servers => "OUTPUT_KAFKA:9092"
codec => json
topic_id => "TOPIC"
batch_size => 5000
request_timeout_ms => "120000"
}
}
logstash.yml:
pipeline.workers: 1 pipeline.output.workers: 1 pipeline.batch.size: 1000 pipeline.batch.delay: 5 path.config: null dead_letter_queue.enable: true
The problem I have is I keep getting time out exceptions in the kafka producer:
[2019-11-05T09:19:22,001][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for TOPIC-28: 120087 ms has passed since last append {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for TOPIC-28: 120087 ms has passed since last append}
Also seeing the following error in the log:
TOPIC-17=OffsetAndMetadata{offset=585890, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
I've tried increasing the request_max_timeout and decreasing the batch size, yet it doesn't seem to help.
This is causing a constant lag of ~250k messages being processed with a delay.
Is there any additional setting I could apply that can resolve this?
Appreciate your help.