Kafka Input plugin error


(Arvind Sharma) #1

Hi All,

I have setup Kafka and trying to use kafka input and output plugin
For Higher availability I tried to use multiple zookeeper and multiple brokers all setup on different machines .
zk1:2181 , zk2:2181, zk3:2181 (all zookeeper instance , use zookeeper package from its website)
kbr1:9092, kbr2:9092 ( kafka broker instance )

Following Error is coming

kafka client threw exception, restarting {:exception=>java.net.UnknownHostException: logstash2: logstash2: unknown error, :level=>:warn, :file=>"logstash/inputs/kafka.rb", :line=>"150", :method=>"run"}
^CSIGINT received. Shutting down the pipeline. {:level=>:warn, :file=>"logstash/agent.rb", :line=>"126", :method=>"execute"}
log4j, [2015-10-01T13:30:43.766] DEBUG: org.I0Itec.zkclient.ZkConnection: Creating new ZookKeeper instance to connect to zk1:2181,zk2:2181:2181,zk3:2181.
log4j, [2015-10-01T13:30:43.766] INFO: org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=zk1:2181,zk2:2181:2181,zk3:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@1e472620
log4j, [2015-10-01T13:30:43.766] DEBUG: org.I0Itec.zkclient.ZkClient: Closing ZkClient...
log4j, [2015-10-01T13:30:43.767] INFO: org.I0Itec.zkclient.ZkEventThread: Starting ZkClient event thread.
log4j, [2015-10-01T13:30:43.768] DEBUG: org.I0Itec.zkclient.ZkClient: Closing ZkClient...done
log4j, [2015-10-01T13:30:43.768] INFO: kafka.utils.VerifiableProperties: Verifying properties
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property auto.commit.enable is overridden to true
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property auto.commit.interval.ms is overridden to 1000
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property auto.offset.reset is overridden to largest
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property consumer.timeout.ms is overridden to -1
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property fetch.message.max.bytes is overridden to 1048576
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property fetch.min.bytes is overridden to 1
log4j, [2015-10-01T13:30:43.769] INFO: kafka.utils.VerifiableProperties: Property fetch.wait.max.ms is overridden to 100
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property group.id is overridden to t
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property queued.max.message.chunks is overridden to 10
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property rebalance.backoff.ms is overridden to 2000
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property rebalance.max.retries is overridden to 4
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property refresh.leader.backoff.ms is overridden to 200
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property socket.receive.buffer.bytes is overridden to 65536
log4j, [2015-10-01T13:30:43.770] INFO: kafka.utils.VerifiableProperties: Property socket.timeout.ms is overridden to 30000
log4j, [2015-10-01T13:30:43.771] INFO: kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to logstash1.shopclues.com:2181,logstash2.shopclues.com:2181:2181,logstash3.shopclues.com:2181
log4j, [2015-10-01T13:30:43.771] INFO: kafka.utils.VerifiableProperties: Property zookeeper.connection.timeout.ms is overridden to 6000
log4j, [2015-10-01T13:30:43.771] INFO: kafka.utils.VerifiableProperties: Property zookeeper.session.timeout.ms is overridden to 6000
log4j, [2015-10-01T13:30:43.771] INFO: kafka.utils.VerifiableProperties: Property zookeeper.sync.time.ms is overridden to 2000
Sending shutdown signal to input thread {:thread=>#<Thread:0x7b558db9 sleep>, :level=>:info, :file=>"logstash/pipeline.rb", :line=>"260", :method=>"shutdown"}
Plugin is finished {:plugin=><LogStash::Inputs::Kafka zk_connect=>"zk1:2181,zk2:2181:2181,zk3:2181", group_id=>"t", topic_id=>"test", reset_beginning=>true, consumer_threads=>1, consumer_restart_on_error=>true, consumer_restart_sleep_ms=>100, decorate_events=>true, debug=>false, codec=><LogStash::Codecs::JSON charset=>"UTF-8">, auto_offset_reset=>"largest", queue_size=>20, rebalance_max_retries=>4, rebalance_backoff_ms=>2000, consumer_timeout_ms=>-1, fetch_message_max_bytes=>1048576, decoder_class=>"kafka.serializer.DefaultDecoder", key_decoder_class=>"kafka.serializer.DefaultDecoder">, :level=>:info, :file=>"logstash/plugin.rb", :line=>"61", :method=>"finished"}
Plugin is finished {:plugin=><LogStash::Outputs::Stdout codec=><LogStash::Codecs::RubyDebug metadata=>false>, workers=>1>, :level=>:info, :file=>"logstash/plugin.rb", :line=>"61", :method=>"finished"}
Pipeline shutdown complete. {:level=>:info, :file=>"logstash/pipeline.rb", :line=>"101", :method=>"run"}


(Joe Lawson) #2

Make sure the host.name property on the brokers is something resolvable by
the consumers.


(Arvind Sharma) #3

@Joe_Lawson

I am still receiving the same error , host.name is set to kb1 and kb2 and both server entries are there is /etc/hosts file on the consumer


(Joe Lawson) #4

Can you post your logstash config?


(Arvind Sharma) #5

@Joe_Lawson

Here's the config

input {
kafka {
zk_connect => "zk1:2181,zk2:2181:2181,zk3:2181"
group_id => "t"
topic_id => "test"
reset_beginning => true
consumer_threads => 1
consumer_restart_on_error => false
consumer_restart_sleep_ms => 100
decorate_events => true
}
}

output { stdout { codec => rubydebug} }


(Arvind Sharma) #6

@Joe_Lawson is there something that can be done in this case


(Joe Lawson) #7

Seems like logstash2 is an unknown host to the consumer. Is that the
hostname of something?
astro http://discuss.elastic.co/users/astro Arvind Sharma
http://discuss.elastic.co/users/astro
October 3

@Joe_Lawson http://discuss.elastic.co/users/joe_lawson is there something
that can be done in this case


(Arvind Sharma) #8

@Joe_Lawson

I figured it out , the logstash consumer for kafka was getting hostname as logtash2 from /etc/sysconfig/network but in /etc/hosts file it was set as ls2 for 127.0.0.1 , also I added one more thing i.e the kafka logstash consumers should resolve both zookeeper hosts as well as kafka broker hosts so I added entry for both in /etc/hosts file ( earlier I made entry for zk hosts only )

btw thanks


(system) #9