Logstash-output Kafka: message load balancing

Is there any good example for specifying the message_key for logstash Kafka output to balance the messages to be stored between different partitions on the kafka server side? I have 3 partitions in the topic, and running 3 logstash instances. Hope to see each Logstash instance would read messages off different partition. Looks like all the messages are coming to the same partition.

I am using timestamp as the message key from client side. The order of messages is not critical.

Thanks

What version of things are you running and what does your config look like?

Logstash: 2.0.0-1.noarch.rpm
Kafka: 2.11-0.9.0.0

According to the kafka doc, is it supposed to be round-robin if no message key is supplied?

Test A) I have script to covert the log message and then pipe them to the logstash.

input {
stdin {
codec => "json"
}
}

filter {
date {
match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
locale => en
}
}

output {
kafka {
#broker_list => "10.4.78.31:9092"
bootstrap_servers => "10.4.78.31:9092"
acks => "0"
topic_id => "kafka-logs-new"
message_key => "%{timestamp}"
}
}

In this case, w/o message_key specified, the massages are distributed across 3 partitions.

Test B) I have logstash installed on the cluster of servers. Following is snippet of logstash conf file:
input {
file {
type => "zookeeper"
path => "/home//logs/zookeeper.out."
exclude => "
.gz"
}
file {
type => "cassandra"
path => "/home//logs/cassandra/system*"
}
.........
}
filter {
.....
grok {
match => {
"message" =>"(?<message_hash_key>[\d\w\s\W]{30})"
}
}
}
output {
kafka {
bootstrap_servers => "10.4.78.31:9092"
topic_id => "kafka-logs-new"
#message_key => "%{message_hash_key}"
}
}

since I can NOTt use "@timestamp" in "message_key" in output, just parse
first 30 character of log message, which contains time stamp plus other
information.

In test B, all the messages go to the same partition. My goal is to distribute the logs among the 3 partitions so that 3 logstash servers can read the messages off 3 partitions.

Thanks

try add field to the event by mutate plugin in filter, then assign new field to message_key.
In my case I had a json event.

input {
file {
path => "/usr/local/tomcat/log/event.log"
#start_position => beginning
#ignore_older => 0
}
}
filter{
json {
source => "message"
target => "message"
}
mutate {
add_field =>{
"user"=> "%{[message][userId]}"
}
}
}
output {
kafka {
topic_id => "topic-name"
bootstrap_servers => "server1:9092,server2:9092,server3:9092"
message_key => "%{user}"
}
}
This code works!

1 Like