I have a logstash pipeline which gets data from kafka and sends it to elasticsearch. However, in elasticsearch, data is not represented correctly.
In this data I want it to be just field:value. But it is field:[value,field]. I could not figure out the reason.
Here is my logstash conf file:
input {
kafka {
codec => "json"
bootstrap_servers => "kafka:9092"
topics => ["tweet_user_id", "detailed_users", "detailed_tweets"]
decorate_events => true
}
}
filter {
if [@metadata][kafka][topic] == "detailed_users" {
mutate {
add_field => {
"user_id" => "%{[message][user_id]}"
}
add_field => {
"user_name" => "%{[message][user_name]}"
}
add_field => {
"bio" => "%{[message][bio]}"
}
add_field => {
"location" => "%{[message][location]}"
}
add_field => {
"website" => "%{[message][website]}"
}
add_field => {
"join_date" => "%{[message][join_date]}"
}
add_field => {
"tweet_count" => "%{[message][tweet_count]}"
}
add_field => {
"following_count" => "%{[message][following_count]}"
}
add_field => {
"followers_count" => "%{[message][followers_count]}"
}
add_field => {
"like_count" => "%{[message][like_count]}"
}
remove_field => ["message", "event"]
}
}
}
output {
if [@metadata][kafka][topic] == "tweet_user_id" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "tweet_user_id_index"
}
}
else if [@metadata][kafka][topic] == "detailed_users" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "detailed_users_index"
}
}
else if [@metadata][kafka][topic] == "detailed_tweets" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "detailed_tweets_index"
}
}
stdout {
codec => json_lines
}
}
Thanks in advance.