Kafka Input performance / config tuning


I have a problem with the kafka input plugin.

I get the following log message after some minutes and then it stops indexing.

WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed for group kafka-es-sink: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

I tried to adjust the session timeout (to 30000) and max poll records (to 250).

The topic produces 1000 events per seconds in avro format. There are 10 partitions (2 servers) and two logstash instances with 5 consumer threads each.

I have no problems with other topics with ~100-300 events per second.

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    client_id => "client1"
    group_id => "group1"
    topics => ["topic1"]
    session_timeout_ms => "30000"
    max_poll_records => "250"
    consumer_threads => 5
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    codec => avro {
      schema_registry_uri => "http://localhost:8081"

output {
  elasticsearch {
    hosts => ["es1:9200", "es2:9200"]
    index => "topic1"
    document_type => "default"
    manage_template => false

I think it should be a config issue because I also have a second connector between kafka and ES on the same topic which works fine (confluent's kafka-connect-elasticsearch)

The main aim is to compare kafka connect and logstash as connector. Maybe anyone has also some experience in general?



I am working on elastic search connector from confluent to send batch data to elastic search index from a kafka topic. I just want to know which is the best way to do that in the 2 options below....

  1. elastic search connector from kafka topic to elastic search index
  2. kafka topic - logstash - elastic search

Also please let me know how to use the connector if we have filters and transformations that are needed to be done to the data before going into elastic search index.

Hoping for you reply very soon .Thanks you in advance.


Hi Abhinav,

in general there are multiple ways to get data from kafka to elasticsearch.

  • kafka topic (structured data) -> kafka connect (elasticsearch connector) -> elasticsearch
  • kafka topic (structured data) -> logstash (kafka input, elasticsearch output) -> elasticsearch

kafka connect is basically designed to transfer the data from a topic to a other data sink as it is without data processing. in my setup i have better results with the confluent kafka connector. but i think i have a config issue with the logstash kafka input plugin... (question in my post)

if the data must be transformed/filtered before indexing you also have multiple options. you can do the data processing with logstash or which i would suggest with kafka streams.

  • kafka topic (raw data) -> kafka streams -> kafka topic (structured data) -> kafka connect -> elasticsearch
  • kafka topic -> logstash (kafka input, filters, elasticsearch output) -> elasticsearch

with kafka streams i measured better performance results for the data processing part and it is fully integrated within a kafka cluster.

Hi Michael,

First of all thank you for responding!!

We have to do some filters and transformations in our pipeline before putting data into elastic search, so which of the two methods you said above might yield better performance when we use filters. I am guessing writing filters in logstash is much easier than writing a k-stream code right? And what abt the performance. !!


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.