Pushing data to kafka topic using logstash

We are using kafka- and integrating it with logstash-2.3.0 and elasticsearch-2.3.0.

We are able to read from the kafka topic using "logstash-input-kafka (2.0.6)" input plugin and store data in elasticsearch.

But, we are not able to push data to kafka topic using "logstash-output-kafka (2.0.3)" output plugin. We have tried with the below command. It is not showing any error but data is not pushed to kafka topic from console.

logstash -e "input { stdin {} } output { kafka { topic_id => 'logstash_logs' } }"

Please help on this.

Did you try with --debug to see what is happening?

I have tried with "--debug" option and below is the result. I don't see any error and pipeline is getting created, but don't know why it is not writing to kafka topic. I am able to write to kafka topic by using kafka-console-producer.sh.

bin/logstash -e "input { stdin {} } output { kafka { topic_id => 'logstash_logs' } }" --debug

Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"stdin", :path=>"logstash/inputs/stdin", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"line", :path=>"logstash/codecs/line", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Codecs::Line/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Codecs::Line/@delimiter = "\n" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Inputs::Stdin/@codec = <LogStash::Codecs::Line charset=>"UTF-8", delimiter=>"\n"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Inputs::Stdin/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"kafka", :path=>"logstash/outputs/kafka", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
starting agent {:level=>:info, :file=>"logstash/agent.rb", :line=>"190", :method=>"execute"}
starting pipeline {:id=>"main", :level=>:info, :file=>"logstash/agent.rb", :line=>"444", :method=>"start_pipeline"}
Settings: Default pipeline workers: 16
config LogStash::Codecs::JSON/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
config LogStash::Outputs::Kafka/@topic_id = "logstash_logs" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"141", :method=>"config_init"}
log4j java properties setup {:log4j_level=>"DEBUG", :level=>:debug, :file=>"logstash/logging.rb", :line=>"89", :method=>"setup_log4j"}
Registering kafka producer {:topic_id=>"logstash_logs", :bootstrap_servers=>"localhost:9092", :level=>:info, :file=>"logstash/outputs/kafka.rb", :line=>"128", :method=>"register"}
Will start workers for output {:worker_count=>1, :class=>LogStash::Outputs::Kafka, :level=>:debug, :file=>"logstash/output_delegator.rb", :line=>"77", :method=>"register"}
Starting pipeline {:id=>"main", :pipeline_workers=>16, :batch_size=>125, :batch_delay=>5, :max_inflight=>2000, :level=>:info, :file=>"logstash/pipeline.rb", :line=>"192", :method=>"start_workers"}
Pipeline main started {:file=>"logstash/agent.rb", :line=>"448", :method=>"start_pipeline"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"461", :method=>"flush"}
this is alPushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"461", :method=>"flush"}
so not working
filter received {:event=>{"@timestamp"=>2016-04-11T03:59:25.444Z, "@version"=>"1", "host"=>"VPPCAZUSW00123.cs-DNA02-BIBDTZ1-RQ29519712.d9.internal.cloudapp.net", "message"=>"this is also not working"}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
output received {:event=>{"@timestamp"=>2016-04-11T03:59:25.444Z, "@version"=>"1", "host"=>"VPPCAZUSW00123.cs-DNA02-BIBDTZ1-RQ29519712.d9.internal.cloudapp.net", "message"=>"this is also not working"}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"461", :method=>"flush"}

Hi rohit, i'm also stuck with the same issue. Any luck so far ..?

Neither of you mentioned if Kafka broker is on the same server as logstash output.

@allenmchan, No the kafka broker is different server

You need to check that the logstash/kafka plugin is at the correct version for your kafka version


Additionally you need to specify the name of the remote server and the port number to connect to.
Looks like it is defaulting to :bootstrap_servers=>"localhost:9092"
So you need to specify your remote server and port in the logstash config


1 Like