Logstash Kafka Output plugins not working on Windows


(Stephen Zhou) #1

Hi folks,
Kafka output plugin is working perfectly on Linux but it seems not working correctly on Windows, not sure how to debug it.

When I make test on Linux, it's working fine:

  • Linux(CentOS6.5/ jdk-8u45/ kafka_2.10-0.8.2.1/ logstash-1.5.2/ IP: 10.10.50.58):

Input console, I type 'Hi, there!':

[root@es-node-02 logstash-1.5.2]# bin/logstash -e "input { stdin {} } output { kafka { topic_id => 'logstash_logs' } }"
Logstash startup completed
Hi, there!

Output console, result be printed:

[root@es-node-02 logstash-1.5.2]# bin/logstash -e "input { kafka { topic_id => 'logstash_logs' } } output { stdout { codec => rubydebug }  }"
Logstash startup completed
{
       "message" => "Hi, there!",
      "@version" => "1",
    "@timestamp" => "2015-07-09T15:30:15.444Z",
          "host" => "es-node-02"
}
  • Windows(Windows Server 2008 R2/ jdk-8u45/ logstash-1.5.2/ IP: 10.10.20.117)

However, when I make same test on Windows. When I type 'Hi', there is no result be printed at output console.
C:\logstash\bin\logstash.conf:

input { stdin {} }
output {
  stdout { codec => rubydebug }
  kafka {
    broker_list => "10.10.50.58:9092"
	topic_id => "logstash_logs"
	codec => plain {
      format => "%{message}"
    }
  }
}

Execute Logstash via cmd:

c:\logstash\bin>logstash agent -f logstash.conf --log c:\logstash\log\logstash.log
io/console not supported; tty will not be manipulated
Sending logstash logs to c:\logstash\log\logstash.log.
Hi
{
       "message" => "Hi\r",
      "@version" => "1",
    "@timestamp" => "2015-07-09T15:41:08.580Z",
          "host" => "NHST-VM-WEB02"
}

Error Log(logstash.log) shows:

{:timestamp=>"2015-07-09T23:41:13.791000+0800", :message=>"kafka producer threw exception, restarting", :exception=>kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries., :level=>:warn}

Kafka INFO(IP: 10.10.50.58):

[2015-07-09 23:41:12,678] INFO Closing socket connection to /10.10.20.117. (kafka.network.Processor)
[2015-07-09 23:41:13,953] INFO Closing socket connection to /10.10.20.117. (kafka.network.Processor)
[2015-07-09 23:41:15,075] INFO Closing socket connection to /10.10.20.117. (kafka.network.Processor)
[2015-07-09 23:41:16,195] INFO Closing socket connection to /10.10.20.117. (kafka.network.Processor)
[2015-07-09 23:41:17,339] INFO Closing socket connection to /10.10.20.117. (kafka.network.Processor)

(Mark Walkom) #2

Try using --debug and/or --verbose to give you more information.


(Stephen Zhou) #3

Thanks Mark!!

At last, issue have been fixed by modifying kafka config file: "server.properties" - 'host.name' section.
I believe host.name is 'localhost' by default, any remote producers will fail to transfer data cause their broker_list specify theIP of broker is '10.10.50.58', the socket connection failed at this place, so I received: 'FailedToSendMessageException'.

BTW, I also made same test on Linux remote machine, and the same exception can be reproduce with exactly same configuration. It turns out the issue not related with OS platform, but just kafka configuration problem.

/home/soft/logstash-1.5.2/etc/ls-shipper.conf

input {
  stdin {}
}
output {
  stdout { codec => rubydebug }
  kafka {
    broker_list => "10.10.50.58:9092"
    topic_id => "logstash_logs"
  }
}

Execute Logstash, input some words: 003 and kafka produce failed with same 'FailedToSendMessageException':

[root@Log etc]#/home/soft/logstash-1.5.2/bin/logstash -f /home/soft/logstash-1.5.2/etc/ls-shipper.conf 
Logstash startup completed
003
{
       "message" => "003",
      "@version" => "1",
    "@timestamp" => "2015-07-10T02:35:33.335Z",
          "host" => "Log"
}
log4j, [2015-07-10T10:35:34.069]  WARN: kafka.producer.async.DefaultEventHandler: Failed to send producer request with correlation id 2 to broker 0 with data for partitions [logstash_logs,0]
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
...
log4j, [2015-07-10T10:35:34.586] ERROR: kafka.producer.async.DefaultEventHandler: Failed to send requests for topics logstash_logs with correlation ids in [0,12]
kafka producer threw exception, restarting {:exception=>kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries., :level=>:warn}

(system) #4