Hi,
i'm trying to use logstash kafka input plugin to read messages then send them to elasticsearch.
my logstash version is 8.6.2
and my kafka version is 3.5.1
root@logstash-02:~# /usr/share/logstash/bin/logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 8.6.2
kafka@kafka:~/kafka$ bin/kafka-topics.sh --version
3.5.1
my logstash config is the following
input {
kafka {
bootstrap_servers => "192.168.40.211:9092"
topics => ["test-topic"]
}
}
filter {
}
#using file to test before switching to elasticsearch output
output {
file {
path => "/tmp/kafka-to-logstash"
}
}
when starting logstash i keep getting an exception when it tries to connect to the kafka host, here are the logstash logs
[2023-08-16T19:52:23,160][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] Kafka commitId: 839b886f9b732b15
[2023-08-16T19:52:23,160][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] Kafka startTimeMs: 1692215543158
[2023-08-16T19:52:23,164][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Subscribed to topic(s): pni-fw
[2023-08-16T19:52:23,590][INFO ][org.apache.kafka.clients.Metadata][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Cluster ID: Swch43o1Q0GBUwmbPnMjhg
[2023-08-16T19:52:23,592][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator kafka:9092 (id: 2147483647 rack: null)
[2023-08-16T19:52:23,593][WARN ][org.apache.kafka.clients.NetworkClient][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Error connecting to node kafka:9092 (id: 2147483647 rack: null)
java.net.UnknownHostException: kafka: Temporary failure in name resolution
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:?]
at java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933) ~[?:?]
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1534) ~[?:?]
at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852) ~[?:?]
at java.net.InetAddress.getAllByName0(InetAddress.java:1524) ~[?:?]
at java.net.InetAddress.getAllByName(InetAddress.java:1381) ~[?:?]
at java.net.InetAddress.getAllByName(InetAddress.java:1305) ~[?:?]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111) ~[kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:513) ~[kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467) ~[kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172) ~[kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985) [kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311) [kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:575) [kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:851) [kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:832) [kafka-clients-2.8.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) [kafka-clients-2.8.1.jar:?]
at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116) [jruby.jar:?]
at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) [jruby.jar:?]
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) [jruby.jar:?]
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby.jar:?]
at org.jruby.runtime.Block.call(Block.java:143) [jruby.jar:?]
at org.jruby.RubyProc.call(RubyProc.java:309) [jruby.jar:?]
at org.jruby.javasupport.Java$ProcToInterface.callProc(Java.java:1232) [jruby.jar:?]
at org.jruby.javasupport.Java$ProcToInterface.access$300(Java.java:1209) [jruby.jar:?]
at org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(Java.java:1270) [jruby.jar:?]
at org.jruby.gen.InterfaceImpl1600050383.run(org/jruby/gen/InterfaceImpl1600050383.gen:13) [jruby.jar:?]
at java.lang.Thread.run(Thread.java:833) [?:?]
[2023-08-16T19:52:23,602][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Group coordinator kafka:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
[2023-08-16T19:52:23,703][WARN ][org.apache.kafka.clients.NetworkClient][main][0f8f29ca9fee4ab290b4d1ea39c4a70c9eb36cb28241b18eab7fac24cc486a13] [Consumer clientId=logstash-0, groupId=logstash] Error connecting to node kafka:9092 (id: 0 rack: null)
java.net.UnknownHostException: kafka
it seems to me that it's trying to connect using the hostname kafka
instead of the IP address i;m specifying in the input section. am i understanding this correctly ?
however when using tcpdump
on the kafka host i can see incoming packets from the logstash host, but nothing is being read from the kafka topic and nothing is written to the output file.
kafka@kafka:~/kafka$ sudo tcpdump src 192.168.40.136 and dst port 9092
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on ens3, link-type EN10MB (Ethernet), capture size 262144 bytes
19:35:39.751427 IP logstash-02.53754 > kafka.9092: Flags [S], seq 1794425554, win 32120, options [mss 1460,sackOK,TS val 1293466322 ecr 0,nop,wscale 0],
length 0
19:35:39.751932 IP logstash-02.53754 > kafka.9092: Flags [.], ack 2806399270, win 32120, options [nop,nop,TS val 1293466323 ecr 1098280366], length 0
19:35:39.793194 IP logstash-02.53754 > kafka.9092: Flags [P.], seq 0:50, ack 1, win 32120, options [nop,nop,TS val 1293466364 ecr 1098280366], length 50
19:35:39.794830 IP logstash-02.53754 > kafka.9092: Flags [.], ack 447, win 31674, options [nop,nop,TS val 1293466366 ecr 1098280409], length 0
19:35:39.868479 IP logstash-02.53754 > kafka.9092: Flags [P.], seq 50:103, ack 447, win 31674, options [nop,nop,TS val 1293466439 ecr 1098280409], lengt
h 53
19:35:39.868876 IP logstash-02.53754 > kafka.9092: Flags [P.], seq 103:139, ack 447, win 31674, options [nop,nop,TS val 1293466440 ecr 1098280409], leng
th 36
19:35:39.869626 IP logstash-02.53754 > kafka.9092: Flags [.], ack 564, win 31557, options [nop,nop,TS val 1293466441 ecr 1098280484], length 0
19:35:39.870390 IP logstash-02.53754 > kafka.9092: Flags [.], ack 599, win 31522, options [nop,nop,TS val 1293466441 ecr 1098280485], length 0