Thanks Mark!!
At last, issue have been fixed by modifying kafka config file: "server.properties" - 'host.name' section.
I believe host.name is 'localhost' by default, any remote producers will fail to transfer data cause their broker_list specify theIP of broker is '10.10.50.58', the socket connection failed at this place, so I received: 'FailedToSendMessageException'.
BTW, I also made same test on Linux remote machine, and the same exception can be reproduce with exactly same configuration. It turns out the issue not related with OS platform, but just kafka configuration problem.
/home/soft/logstash-1.5.2/etc/ls-shipper.conf
input {
stdin {}
}
output {
stdout { codec => rubydebug }
kafka {
broker_list => "10.10.50.58:9092"
topic_id => "logstash_logs"
}
}
Execute Logstash, input some words: 003 and kafka produce failed with same 'FailedToSendMessageException':
[root@Log etc]#/home/soft/logstash-1.5.2/bin/logstash -f /home/soft/logstash-1.5.2/etc/ls-shipper.conf
Logstash startup completed
003
{
"message" => "003",
"@version" => "1",
"@timestamp" => "2015-07-10T02:35:33.335Z",
"host" => "Log"
}
log4j, [2015-07-10T10:35:34.069] WARN: kafka.producer.async.DefaultEventHandler: Failed to send producer request with correlation id 2 to broker 0 with data for partitions [logstash_logs,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
...
log4j, [2015-07-10T10:35:34.586] ERROR: kafka.producer.async.DefaultEventHandler: Failed to send requests for topics logstash_logs with correlation ids in [0,12]
kafka producer threw exception, restarting {:exception=>kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries., :level=>:warn}