Logstash group imbalance due consumer thread!

I have below configuration in Logstash from kafka ,

input {
kafka {
bootstrap_servers => "server1 , server2 , server3"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path => "/etc/logstash/kafka-client-jaas.conf"
topics => ["2018-07-19"]
auto_commit_interval_ms => "50000"
auto_offset_reset => "latest"
connections_max_idle_ms => "500000"
consumer_threads => 40
fetch_max_bytes => "209715200"
heartbeat_interval_ms => "100"
max_poll_records => "32760"
max_partition_fetch_bytes => "209715200"
receive_buffer_bytes => "80960"
send_buffer_bytes => "80960"
}
}

getting below errors , logstash is not able to start .

• [2018-07-19T14:04:47,707][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_171]

• 2147483645 rack: null): (type=SyncGroupRequest, groupId=logstash, generationId=6, memberId=logstash-7-73e735e2-bbcd-444e-bfe4-8d4b9638dcd4, groupAssignment=)
[2018-07-19T14:04:47,728][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] SyncGroup for group logstash failed due to coordinator rebalance
[2018-07-19T14:04:47,729][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group logstash
[2018-07-19T14:04:47,729][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] SyncGroup for group logstash failed due to coordinator rebalance

• [2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.clients.consumer.KafkaConsumer] Starting the Kafka consumer
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] SyncGroup for group logstash failed due to coordinator rebalance
[2018-07-19T14:04:47,733][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group logstash
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Sending JoinGroup ((type: JoinGroupRequest, groupId=logstash, sessionTimeout=10000, rebalanceTimeout=300000, memberId=logstash-18-a2f310e5-0bc1-456a-a8b9-49cfc2f5d57a, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@3af70b27)) to coordinator vkafka03dsy:9092 (id: 2147483645 rack: null)
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.clients.Metadata] Updated cluster metadata version 1 to Cluster(id = null, nodes = [10.81.101.216:9092 (id: -3 rack: null), 10.81.101.226:9092 (id: -2 rack: null), 10.81.101.160:9092 (id: -1 rack: null)], partitions = [])
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name fetch-throttle-time
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name connections-closed:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name connections-created:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bytes-sent-received:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bytes-sent:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bytes-received:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name select-time:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name io-time:
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name heartbeat-latency
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name join-latency
[2018-07-19T14:04:47,733][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name sync-latency
[2018-07-19T14:04:47,734][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name commit-latency
[2018-07-19T14:04:47,734][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bytes-fetched
[2018-07-19T14:04:47,734][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name records-fetched
[2018-07-19T14:04:47,734][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name fetch-latency
[2018-07-19T14:04:47,734][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name records-lag
[2018-07-19T14:04:47,734][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.11.0.0
[2018-07-19T14:04:47,734][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : cb8625948210849f
[2018-07-19T14:04:47,734][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-2
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_171]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_171]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_171]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_171]

• [2018-07-19T14:04:47,067][WARN ][logstash.pipeline ] CAUTION: Recommended inflight events max exceeded! Logstash will run with up to 12000 events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently 1000), or changing the number of pipeline workers (currently 12) {:pipeline_id=>"test1", :thread=>"#<Thread:0x60c23f31@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:245 run>"}

What could be the issue here ?

Please help in resolving this .

Most of that is just debug/info noise. There is one non-issue

	[2018-07-19T14:04:47,707][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-1
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)

I would expect you to get this if you have two pipelines with kafka inputs with the same id. I ignore it where I get it.

The other warning is

[2018-07-19T14:04:47,067][WARN ][logstash.pipeline ] CAUTION: Recommended inflight events max exceeded! Logstash will run with up to 12000 events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently 1000), or changing the number of pipeline workers (currently 12) {:pipeline_id=>"test1", 

OK, so reduce your batch size or the number of threads. Neither of these actually prevent logstash starting, they are just warnings.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.