Kafka-input


(vimal manubhai patel) #1

Hello i am using kakfa-input with stunnel

stunnel config file

fips = no
[broker1]
accept = 127.0.0.1:9195
connect = 10.25.37.3:9193
client = yes
cert = /usr/local/etc/stunnel/kafka-cli-stunnel.pem

[broker2]
accept = 127.0.0.1:9197
connect = 10.25.37.10:9193
client = yes
cert = /usr/local/etc/stunnel/kafka-cli-stunnel.pem

[broker3]
accept = 127.0.0.1:9198
connect = 10.25.37.11:9193
client = yes
cert = /usr/local/etc/stunnel/kafka-cli-stunnel.pem

[broker4]
accept = 127.0.0.1:9199
connect = 10.25.37.5:9193
client = yes
cert = /usr/local/etc/stunnel/kafka-cli-stunnel.pem

logstash config file

parts of kafka-local log stash configuration.

input {
kafka {
zk_connect => "edhtgden1101.kdc.xxxx.com:2181,edhtgden1103.kdc.xxxx.com:2181,edhtgden1104.kdc.xxxx.com:2181,edhtgden1105.kdc.xxxx.com:2181/kafka2"
topic_id => "FEEDZAI.EVENT.OAO.DEBITELIGIBILITY"
reset_beginning => true
consumer_threads => 4
codec => json {}
}
}

output {
file{
path =>"/Users/iwg323/Documents/feedzai-kakfa"
}
}

when i run logstash it is throwing below error.

log4j, [2015-11-03T14:30:09.874] WARN: kafka.client.ClientUtils$: Fetching topic metadata with correlation id 6 for topics [Set(FEEDZAI.EVENT.OAO.DEBITELIGIBILITY)] from broker [id:1,host:localhost,port:9195] failed
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:206)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Any idea ?


(Joe Lawson) #2

Make sure the broker host.name matches what the consumer will connect to. Check out this comment on librdkafka about their stunnel setup: https://github.com/edenhill/librdkafka/issues/154


(system) #3