Kafka input and output plugin for logstash is not working

Hello All,

I am using Logstash 2.1.1 and kafka version 2.10-0.8.2.1. I am getting following syntax error. What are the minumum required settings for kafka plugin.

Input.conf
input {
tcp {
port => 5010
#type => syslog
}
udp {
port => 5010
#type => syslog
}

}
output {
kafka {
zk_connect => "10.195.115.15:2181"
topic_id => "logstash-kafka"
}
}

Output.conf:
input {
kafka {
host => "10.195.115.15"
port => "9092"
type => "gaurav-type"
topic => "logstash-kafka"
message_format => "json_event"
}
}

output {
stdout {
codec => rubydebug
}
}

Error :
For input.conf
bash-3.2$ bin/logstash -f ../l1.conf --configtest
Unknown setting 'zk_connect' for kafka {:level=>:error}
Error: Something is wrong with your configuration.

For output.conf
bash-3.2$ bin/logstash -f ../l2.conf --configtest
Error: The setting message_format in plugin kafka is obsolete and is no longer available. Setting is no longer valid. If you have any questions about this, you are invited to visit https://discuss.elastic.co/c/logstash and ask.

I want to know simple config for kafka.
1: Listen syslong event at certain port
2: Put this syslog message to kafka topic
3: Pick message from Kafka topic
4: Parse it
5: Put it in elasticsearch

Thanks,
Gaurav

Change message_format in the Kafka output to codec => json {}

Thanks Mate. but its not working.
Here is the output file
input {
tcp {
port => 5010
#type => syslog
}
udp {
port => 5010
#type => syslog
}

}

output {
kafka {
zk_connect => "10.195.115.15:2181"
topic_id => "logstash-kafka"
codec => json {}
}
}

bash-3.2$ bin/logstash -f ../l1.conf --configtest
Unknown setting 'zk_connect' for kafka {:level=>:error}
Error: Something is wrong with your configuration.

Thanks,
Gaurav

Make sure your parameters match the plugin. For example a Kafka output
needs bootstrap_servers while an input needs zk_connect.
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
and
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

2 Likes

Thanks for reply Joe.

I'm actually at a similar stage. From looking at previous examples on the internet, it looks like there was a previous value brooker_list where you would assign the address of your kafka queue. However, now it seems to have changed to bootstrap_server.

Is that a correct interpretation?

I'm seeing other examples
kafka { host => "127.0.0.1" port => 9092 topic => "logstash" } [Reference:https://gist.github.com/jeroenvandijk/4963802]

output {
kafka {
:broker_list => "kafka1.mmlac.com:9092"
:topic_id => "logstash"
:compression_codec => "snappy"
:request_required_acks => 1
}
}
[Reference:http://blog.mmlac.com/how-to-pre-process-logs-with-logstash/]

Kind regards,
Sarah

Yeah always refer to the plugin doc for your version. Generally changes are made on the logstash side for configuration keys to match what configuration values are used in Kafka.

So for example, Kafka Producers in 0.8 called it metadata.broker.list (http://kafka.apache.org/082/documentation.html#producerconfigs) while Kafka 0.9 producers call it broker.list (http://kafka.apache.org/documentation.html#producerconfigs)

I know it is confusing but that is the reason for most of the changes.

I guess the reason I am confused is when I look at the output plug information page for kafka https://www.elastic.co/guide/en/logstash/2.2/plugins-outputs-kafka.html, there is no longer an option for broker_list.

Under bootstrap_servers it says, "This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers."

However, does this mean that where I would previously have put the host and post opposite brooker_list, I now use bootstrap_servers?

Thanks,

Yes they are functionally equivalent but the new name more accurately describing the function of the parameter. Whereas previous broker_list implies you supply a list of brokers to communicate with, the new one more accurately describes what is happening, ie you provide a list of servers to bootstrap from. In both situations the Kafka producer was just using the servers to get the latest metadata.

So yes, whatever your broker_list was, you can use that value in bootstrap_servers.

1 Like

Thanks Joe, I appreciate your prompt feedback. It's been a big help.
Sarah

logstash input plugin for kafka is not working for me: Here is my conf file:

input {
kafka { topic_id => 'logstashlogs' }
}
output {
stdout{}
}
Nothing comes up to the screen, just lostash starts, no error, nothing is coming up to the screen. Kafka topic(logstashlogs) have data. I am able to inject data into kafka queue using logstash but not able to read it. Any suggestions?

You should start a new thread for this question.

Try running it with --verbose or --debug to see an error. https://www.elastic.co/guide/en/logstash/current/command-line-flags.html

1 Like

created new topic
"Logstash Input Plugin for kafka 0.9 is not working"