Pushing data to kafka topic using logstash

We are using kafka-0.8.2.1 and integrating it with logstash-2.3.0 and elasticsearch-2.3.0.

We are able to read from the kafka topic using "logstash-input-kafka (2.0.6)" input plugin and store data in elasticsearch.

But, we are not able to push data to kafka topic using "logstash-output-kafka (2.0.3)" output plugin. We have tried with the below command. It is not showing any error but data is not pushed to kafka topic from console.

logstash -e "input { stdin {} } output { kafka { topic_id => 'logstash_logs' } }"

Please help on this.

Did you try with --debug to see what is happening?

I have tried with "--debug" option and below is the result. I don't see any error and pipeline is getting created, but don't know why it is not writing to kafka topic. I am able to write to kafka topic by using kafka-console-producer.sh.

bin/logstash -e "input { stdin {} } output { kafka { topic_id => 'logstash_logs' } }" --debug

Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"stdin", :path=>"logstash/inputs/stdin", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"line", :path=>"logstash/codecs/line", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Codecs::Line/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Codecs::Line/@delimiter = "\n" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Inputs::Stdin/@codec = <LogStash::Codecs::Line charset=>"UTF-8", delimiter=>"\n"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Inputs::Stdin/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"kafka", :path=>"logstash/outputs/kafka", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
starting agent {:level=>:info, :file=>"logstash/agent.rb", :line=>"190", :method=>"execute"}
starting pipeline {:id=>"main", :level=>:info, :file=>"logstash/agent.rb", :line=>"444", :method=>"start_pipeline"}
Settings: Default pipeline workers: 16
config LogStash::Codecs::JSON/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Outputs::Kafka/@topic_id = "logstash_logs" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
log4j java properties setup {:log4j_level=>"DEBUG", :level=>:debug, :file=>"logstash/logging.rb", :line=>"89", :method=>"setup_log4j"}
Registering kafka producer {:topic_id=>"logstash_logs", :bootstrap_servers=>"localhost:9092", :level=>:info, :file=>"logstash/outputs/kafka.rb", :line=>"128", :method=>"register"}
Will start workers for output {:worker_count=>1, :class=>LogStash::Outputs::Kafka, :level=>:debug, :file=>"logstash/output_delegator.rb", :line=>"77", :method=>"register"}
Starting pipeline {:id=>"main", :pipeline_workers=>16, :batch_size=>125, :batch_delay=>5, :max_inflight=>2000, :level=>:info, :file=>"logstash/pipeline.rb", :line=>"192", :method=>"start_workers"}
Pipeline main started {:file=>"logstash/agent.rb", :line=>"448", :method=>"start_pipeline"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"461", :method=>"flush"}
this is alPushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"461", :method=>"flush"}
so not working
filter received {:event=>{"@timestamp"=>2016-04-11T03:59:25.444Z, "@version"=>"1", "host"=>"VPPCAZUSW00123.cs-DNA02-BIBDTZ1-RQ29519712.d9.internal.cloudapp.net", "message"=>"this is also not working"}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
output received {:event=>{"@timestamp"=>2016-04-11T03:59:25.444Z, "@version"=>"1", "host"=>"VPPCAZUSW00123.cs-DNA02-BIBDTZ1-RQ29519712.d9.internal.cloudapp.net", "message"=>"this is also not working"}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"461", :method=>"flush"}

Hi rohit, i'm also stuck with the same issue. Any luck so far ..?

Neither of you mentioned if Kafka broker is on the same server as logstash output.

@allenmchan, No the kafka broker is different server

You need to check that the logstash/kafka plugin is at the correct version for your kafka version

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

Additionally you need to specify the name of the remote server and the port number to connect to.
Looks like it is defaulting to :bootstrap_servers=>"localhost:9092"
So you need to specify your remote server and port in the logstash config

Cheers,
Mick

1 Like