Kafka-Elasticsearch Logstash Configuration Error

I have a logstash pipeline which gets data from kafka and sends it to elasticsearch. However, in elasticsearch, data is not represented correctly.
In this data I want it to be just field:value. But it is field:[value,field]. I could not figure out the reason.


Here is my logstash conf file:

input {
    kafka {
        codec => "json"
        bootstrap_servers => "kafka:9092"
        topics => ["tweet_user_id", "detailed_users", "detailed_tweets"]
        decorate_events => true
    }
}

filter {
    if [@metadata][kafka][topic] == "detailed_users" {
        mutate {
            add_field => { 
                "user_id" => "%{[message][user_id]}"
            }
            add_field => {
                "user_name" => "%{[message][user_name]}"
            }
            add_field => {
                "bio" => "%{[message][bio]}"
            }
            add_field => {
                "location" => "%{[message][location]}"
            }
            add_field => {
                "website" => "%{[message][website]}"
            }
            add_field => {
                "join_date" => "%{[message][join_date]}"
            }
            add_field => {
                "tweet_count" => "%{[message][tweet_count]}"
            }
            add_field => {
                "following_count" => "%{[message][following_count]}"
            }
            add_field => {
                "followers_count" => "%{[message][followers_count]}"
            }
            add_field => {
                "like_count" => "%{[message][like_count]}"
            }
            remove_field => ["message", "event"]
        }
    }
}


output {
    if [@metadata][kafka][topic] == "tweet_user_id" {
        elasticsearch {
            hosts => ["elasticsearch:9200"]
            index => "tweet_user_id_index"
        }
    }

    else if [@metadata][kafka][topic] == "detailed_users" {
        elasticsearch {
            hosts => ["elasticsearch:9200"]
            index => "detailed_users_index"
        }
    }

    else if [@metadata][kafka][topic] == "detailed_tweets" {
        elasticsearch {
            hosts => ["elasticsearch:9200"]
            index => "detailed_tweets_index"
        }
    }
    stdout { 
        codec => json_lines 
    }
}

Thanks in advance.

The json codec is creating fields at the top level. You are then trying to move them from inside the [message] field using mutate+add_field, but since the source fields do not exist, that just converts all of the fields to arrays. Remove all of the add_field mutates.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.