Error Connecting kafka with logsash

OS: Rocky Linux 8.5
Logstash: logstash-7.16.2

Hi, I'm trying to start logstash with kafka as input, here is the config:

input {
   kafka{
        bootstrap_servers => "xx.xx.xxx.xx:9092"
        topics => ["ext_device-control-events_10121","ext_device-event_10121","ext_device-measurement_10121","ext_device-parameter_10121","ext_device-telemetry_10121","ext_device-topology_10121","ext_device_10121","ext_incoming-device-events_10121","ext_master-data-events_10121","ext_metering-point_10121","ext_process-events_10121","ext_register-statistic_10121","ext_device-process_10121","ext_device_measurement_10121","ext_device_measurment_10121","ext_deviceevent_10121","ext_mdus-events_10121","ext_operational-process_10121","ext_registerstatistic_10121" ]
        codec => "json"
        group_id => "logstash-consumers-1"
        consumer_threads => 1
        decorate_events => true
        auto_offset_reset => "earliest"
        add_field => {
        "host" => "ZONOS IPv4"
         }
    }
	}
	
output {
  file {
     path => "/home/avs/zonos_all_ipv4.out"
  }
 }

And I'm getting the error:

Using bundled JDK: /opensearch/logstash-7.16.2/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /opensearch/logstash/data which is now configured via log4j2.properties
[2022-01-20T09:13:39,166][INFO ][logstash.runner          ] Log4j configuration path used is: /opensearch/logstash-7.16.2/config/log4j2.properties
[2022-01-20T09:13:39,183][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.13+8 on 11.0.13+8 +indy +jit [linux-x86_64]"}
[2022-01-20T09:13:39,654][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2022-01-20T09:13:40,757][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-01-20T09:13:42,178][INFO ][org.reflections.Reflections] Reflections took 118 ms to scan 1 urls, producing 119 keys and 417 values 
[2022-01-20T09:13:43,594][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/home/avs/ipv4.conf"], :thread=>"#<Thread:0x6e4b890a run>"}
[2022-01-20T09:13:44,526][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.93}
[2022-01-20T09:13:44,576][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2022-01-20T09:13:44,763][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [65.1.48.193:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = logstash-0
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = logstash-consumers-1
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 50
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2022-01-20T09:13:44,812][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2022-01-20T09:13:44,917][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] Kafka version: 2.5.1
[2022-01-20T09:13:44,917][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] Kafka commitId: 0efa8fb0f4c73d92
[2022-01-20T09:13:44,917][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] Kafka startTimeMs: 1642688024913
[2022-01-20T09:13:44,927][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] Subscribed to topic(s): ext_device-control-events_10121, ext_device-event_10121, ext_device-measurement_10121, ext_device-parameter_10121, ext_device-telemetry_10121, ext_device-topology_10121, ext_device_10121, ext_incoming-device-events_10121, ext_master-data-events_10121, ext_metering-point_10121, ext_process-events_10121, ext_register-statistic_10121, ext_device-process_10121, ext_device_measurement_10121, ext_device_measurment_10121, ext_deviceevent_10121, ext_mdus-events_10121, ext_operational-process_10121, ext_registerstatistic_10121
[2022-01-20T09:13:45,792][INFO ][org.apache.kafka.clients.Metadata][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] Cluster ID: KDpznPsAQyKSR2RDb8FCXQ
[2022-01-20T09:13:45,795][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] Discovered group coordinator ip-10-92-3-113.ap-south-1.compute.internal:9092 (id: 2147483647 rack: null)
[2022-01-20T09:13:45,883][WARN ][org.apache.kafka.clients.NetworkClient][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] Error connecting to node ip-10-92-3-113.ap-south-1.compute.internal:9092 (id: 2147483647 rack: null)
java.net.UnknownHostException: ip-10-92-3-113.ap-south-1.compute.internal: Name or service not known
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:?]
        at java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:929) ~[?:?]
        at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1519) ~[?:?]
        at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848) ~[?:?]
        at java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[?:?]
        at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) ~[kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) ~[kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) ~[kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:572) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:798) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:778) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:236) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:469) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) [kafka-clients-2.5.1.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) [kafka-clients-2.5.1.jar:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:426) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:293) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207) [jruby-complete-9.2.20.1.jar:?]
        at opensearch.logstash_minus_7_dot_16_dot_2.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_8_dot_2_minus_java.lib.logstash.inputs.kafka.RUBY$method$do_poll$0(/opensearch/logstash-7.16.2/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.8.2-java/lib/logstash/inputs/kafka.rb:328) [jruby-complete-9.2.20.1.jar:?]
        at opensearch.logstash_minus_7_dot_16_dot_2.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_8_dot_2_minus_java.lib.logstash.inputs.kafka.RUBY$method$do_poll$0$__VARARGS__(/opensearch/logstash-7.16.2/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.8.2-java/lib/logstash/inputs/kafka.rb:325) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207) [jruby-complete-9.2.20.1.jar:?]
        at opensearch.logstash_minus_7_dot_16_dot_2.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_8_dot_2_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$1(/opensearch/logstash-7.16.2/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.8.2-java/lib/logstash/inputs/kafka.rb:313) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.runtime.Block.call(Block.java:139) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.RubyProc.call(RubyProc.java:318) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.javasupport.Java$ProcToInterface.callProc(Java.java:1136) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.javasupport.Java$ProcToInterface.access$300(Java.java:1113) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(Java.java:1174) [jruby-complete-9.2.20.1.jar:?]
        at org.jruby.gen.InterfaceImpl160708917.run(org/jruby/gen/InterfaceImpl160708917.gen:13) [jruby-complete-9.2.20.1.jar:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
[2022-01-20T09:13:45,903][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] (Re-)joining group
[2022-01-20T09:13:45,914][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] Group coordinator ip-10-92-3-113.ap-south-1.compute.internal:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2022-01-20T09:13:45,919][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][8db2da5c859096998b18259059e66d18a29852e8d541fe9ab7e02d14b3f23621] [Consumer clientId=logstash-0, groupId=logstash-consumers-1] Join group failed with org.apache.kafka.common.errors.DisconnectException

I'm not sure what am I doing wrong, The machine is reachable(ping ok), the port can be is open checked by using telnet
please help.
Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.