Logstash kafka input plugin not working

Hi,
i'm trying to use logstash kafka input plugin to read messages then send them to elasticsearch.

my logstash version is 8.6.2 and my kafka version is 3.5.1

root@logstash-02:~# /usr/share/logstash/bin/logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 8.6.2

kafka@kafka:~/kafka$ bin/kafka-topics.sh --version
3.5.1

my logstash config is the following

input {
  kafka {
    bootstrap_servers => "192.168.40.211:9092"
    topics => ["test-topic"]
  }
}

filter {

}

#using file to test before switching to elasticsearch output
output {
  file {
   path => "/tmp/kafka-to-logstash"
  }
}

when starting logstash i keep getting an exception when it tries to connect to the kafka host, here are the logstash logs

[2023-08-16T19:52:23,160][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] Kafka commitId: 839b886f9b732b15
[2023-08-16T19:52:23,160][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] Kafka startTimeMs: 1692215543158
[2023-08-16T19:52:23,164][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Subscribed to topic(s): pni-fw
[2023-08-16T19:52:23,590][INFO ][org.apache.kafka.clients.Metadata][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Cluster ID: Swch43o1Q0GBUwmbPnMjhg
[2023-08-16T19:52:23,592][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator kafka:9092 (id: 2147483647 rack: null)
[2023-08-16T19:52:23,593][WARN ][org.apache.kafka.clients.NetworkClient][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Error connecting to node kafka:9092 (id: 2147483647 rack: null)
java.net.UnknownHostException: kafka: Temporary failure in name resolution
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:?]
        at java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933) ~[?:?]
        at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1534) ~[?:?]
        at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852) ~[?:?]
        at java.net.InetAddress.getAllByName0(InetAddress.java:1524) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1381) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1305) ~[?:?]
        at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111) ~[kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:513) ~[kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467) ~[kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172) ~[kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985) [kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311) [kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:575) [kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:851) [kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:832) [kafka-clients-2.8.1.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) [kafka-clients-2.8.1.jar:?]
        at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116) [jruby.jar:?]
        at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) [jruby.jar:?]
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) [jruby.jar:?]
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby.jar:?]
        at org.jruby.runtime.Block.call(Block.java:143) [jruby.jar:?]
        at org.jruby.RubyProc.call(RubyProc.java:309) [jruby.jar:?]
        at org.jruby.javasupport.Java$ProcToInterface.callProc(Java.java:1232) [jruby.jar:?]
        at org.jruby.javasupport.Java$ProcToInterface.access$300(Java.java:1209) [jruby.jar:?]
        at org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(Java.java:1270) [jruby.jar:?]
        at org.jruby.gen.InterfaceImpl1600050383.run(org/jruby/gen/InterfaceImpl1600050383.gen:13) [jruby.jar:?]
        at java.lang.Thread.run(Thread.java:833) [?:?]
[2023-08-16T19:52:23,602][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Group coordinator kafka:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
[2023-08-16T19:52:23,703][WARN ][org.apache.kafka.clients.NetworkClient][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Error connecting to node kafka:9092 (id: 0 rack: null)
java.net.UnknownHostException: kafka

it seems to me that it's trying to connect using the hostname kafka instead of the IP address i;m specifying in the input section. am i understanding this correctly ?

however when using tcpdump on the kafka host i can see incoming packets from the logstash host, but nothing is being read from the kafka topic and nothing is written to the output file.

kafka@kafka:~/kafka$ sudo tcpdump src 192.168.40.136 and dst port 9092                                                                                  
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode                                                                              
listening on ens3, link-type EN10MB (Ethernet), capture size 262144 bytes                                                                               
                                                                                                                                       
19:35:39.751427 IP logstash-02.53754 > kafka.9092: Flags [S], seq 1794425554, win 32120, options [mss 1460,sackOK,TS val 1293466322 ecr 0,nop,wscale 0],
 length 0                                                                                                                                               
19:35:39.751932 IP logstash-02.53754 > kafka.9092: Flags [.], ack 2806399270, win 32120, options [nop,nop,TS val 1293466323 ecr 1098280366], length 0   
19:35:39.793194 IP logstash-02.53754 > kafka.9092: Flags [P.], seq 0:50, ack 1, win 32120, options [nop,nop,TS val 1293466364 ecr 1098280366], length 50
19:35:39.794830 IP logstash-02.53754 > kafka.9092: Flags [.], ack 447, win 31674, options [nop,nop,TS val 1293466366 ecr 1098280409], length 0          
19:35:39.868479 IP logstash-02.53754 > kafka.9092: Flags [P.], seq 50:103, ack 447, win 31674, options [nop,nop,TS val 1293466439 ecr 1098280409], lengt
h 53                                                                                                                                                    
19:35:39.868876 IP logstash-02.53754 > kafka.9092: Flags [P.], seq 103:139, ack 447, win 31674, options [nop,nop,TS val 1293466440 ecr 1098280409], leng
th 36                                                                                                                                                   
19:35:39.869626 IP logstash-02.53754 > kafka.9092: Flags [.], ack 564, win 31557, options [nop,nop,TS val 1293466441 ecr 1098280484], length 0          
19:35:39.870390 IP logstash-02.53754 > kafka.9092: Flags [.], ack 599, win 31522, options [nop,nop,TS val 1293466441 ecr 1098280485], length 0          

What does your kafka config looks like? More specifically the listeners and advertised.listeners line.

1 Like

Hi @leandrojmp thanks for the reply,

the kafka config is the default values

#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

i was able to make it work by adding a dns record for the hostname "kafka" in the /etc/hosts file on the logstash server.

but i'm intersted to know how it would be done through the kafka config.
and in the case where we have a kafka cluster do we list all the kafka brokers in the listeners field? or does each broker only puts his own ip address or hostname?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.