Duplicate log entries

Hi,

We are facing some issues with elasticsearch. We are having lots of duplicate log entries like below. Inside logstash.yml there is only a file in path.config that we use: path.config: "/etc/logstash/pipeline.global.conf"

Does someone has any idea on how to fix this issue ?

Thank you

Where does the duplicated data reside? Is it by any chance a shared drive? How is it being indexed into Elasticsearch?

Hi Christian

Thank you for your reply. The origin of logs are inside shared drives but they are not duplicated. We use filebeat and logstash with the following pipeline configuration:

if [client] == "iis" {
if [indexname] {
elasticsearch {
hosts => [ "https://xxxxxx:9200" ]
index=> "rq-%{[client]}-%{[indexname]}-%{+YYYY.MM.dd}"

Logstash and Filebeat can have issues reading from network drives, which is not recommended. This may very well be why you are seeing duplicates.

There could also be other reasons, e.g. issues with your Logstash config, but it is hard to tell without more details around config and how frequent the duplication issue is.

Sorry, the drive is shared but filebeat takes it from the drive physically reside inside the same machine, not from a network.

Can I send you my pipeline config file?

Hmmm, the two documents are not necessarily duplicates as at least log.offset differs. There may not be anything wrong at all...

In that case you are right. But in another example, we checked the log file and there wasn´t duplicated lines. In this case, only the _id and the ingest time are different:

If you are sending documents to Elasticsearch in bulks and do not specify custom document_id, you can not guarantee exactly once delivery.
When ES cluster is busy it might "reject" indexing requests (so-called back pressure). In case of bulk requests, a coordinating node will split the request into smaller "sub-requests" (1 per shard) and will send all requests in parallel. In such scenario, some sub-requests can complete successfully while others might fail resulting in partial rejection. Your logstash might retry the whole bulk though.
If you provide your own document_id, it should fix the duplicate issue but it will affect you indexing performance.

1 Like

That is a significant difference in ingest time, so suggests it is not due to retries when indexing into Elasticsearch. I would recommend searching for other log entries from that file around that time and check if those also are duplicated. If it is the entire file it would seem like something happened at the file system level, causing the file to be reprocessed, but it is hard to tell without being able to investigate the data directly.

Hi

Thank you for your answer. I checked the file itself and there is no duplicate lines.

Are you saying that this is the only document that has been duplicated in Elasticsearch from that file?

No, we have for example 6 identical log entries with different ingest times.

Yes, but is it just one entry that has been duplicated 6 times or have ALL entries (at least the initial ones) been duplicated as well?

It is one entry from the log file that has been duplicated 6 times in elasticsearch.

If that is one of only a few duplicated entries from the file I am not sure where the issue is.

Can you share your full logstash pipeline?

Ola, vide abaixo

Obrigado

input { beats { port => 5044 } }

filter {

    if [fields][client] {

       mutate {

                add_field => { "client" => "%{[fields][client]}" }

                remove_field => ["[fields][client]"]

       }

       if [fields][indexname] {

            mutate {

                    add_field => { "indexname" => "%{[fields][indexname]}" }

                    remove_field => ["[fields][indexname]"]

            }

        }

    }

    if [client] == "wso2" {

        ######### Extracting message elements #########

        grok {

            patterns_dir => [ "/etc/logstash/patterns/extra_patterns" ]

            #wso2carbon-log

            match => { "message" => "TID:%{SPACE}\[(%{INT:tenant-id})?\]%{SPACE}\[(%{WORD:server-type})?\]%{SPACE}\[%{TIMESTAMP_ISO8601:date-time}\]%{SPACE}%{LOGLEVEL:level}%{SPACE}{%{JAVACLASS:logger-name}}%{SPACE}-%{SPACE}%{LOGMESSAGE:log-message}((\r?\n)%{STACKTRACE:stacktrace})?" }

            #wso2errors-log, wso2-service-log

            match => { "message" => "%{TIMESTAMP_ISO8601:date-time}%{SPACE}\[%{NOTSPACE:server-type}\]%{SPACE}\[%{GREEDYDATA:logger-name}\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{NOTSPACE:component}%{SPACE}(%{LOGMESSAGE:log-message})?((\r?\n)%{STACKTRACE:stacktrace})?" }

            #wso2httpaccessmanagementconsole-log

            match => { "message" => "%{IP:ip-address}%{SPACE}-%{SPACE}-%{SPACE}\[%{HTTPDATE:date-time}\]%{SPACE}%{QS:method}%{SPACE}%{INT:status}%{SPACE}%{NOTSPACE:time-taken}%{SPACE}%{QS:uri-stem}%{SPACE}%{QS:user-agent}" }

            #wso2trace-log

            match => { "message" => "%{TIME:time}%{SPACE}\[-\]%{SPACE}\[%{NOTSPACE:logger-name}\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{NOTSPACE:component}%{SPACE}%{LOGMESSAGE:log-message}" }

            #ws02patches-log

            match => { "message" => "\[%{TIMESTAMP_ISO8601:date-time}\]%{SPACE}%{LOGLEVEL:level}%{SPACE}\{%{JAVACLASS:logger-name}\}%{SPACE}-%{SPACE}(%{LOGMESSAGE:log-message})?" }

            #wso2audit-log

            match => { "message" => "\[%{TIMESTAMP_ISO8601:date-time}\]%{SPACE}%{LOGLEVEL:level}%{SPACE}-%{SPACE}(%{LOGMESSAGE:log-message})?" }

            #wso2atomiktransaction-log

            match => { "message" => "%{CAPLOGLEVEL:level}%{SPACE}%{LOGMESSAGE:log-message}((\r?\n)%{STACKTRACE:stacktrace})?" }

        }

        ######### Updating timestamp field #########

        if [type] == "wso2trace_log"

        {

            grok {

                match => { "@timestamp" => "%{DATE:date}[T ]%{TIME}" }

                add_field => { "date-time" => "%{date}:%{time}" }

            }

        }

        date {

            match => [ "date-time", "yyyy-MM-dd HH:mm:ss,SSS", "yyyy-MM-dd HH:mm:ss", "dd/MMM/yyy:HH:mm:ss Z", "yy-MM-dd:HH:mm:ss,SSS" ]

            target => "@timestamp"

        }

        ######### Removing unused fields #########

        if [type] == "wso2httpaccessmanagementconsole_log"

        {

            mutate {

                remove_field => [ "message" ]

            }

        }

        mutate {

            update => { "message" => "%{log-message}" }

            remove_field => [ "date", "time", "date-time", "log-message" ]

        }

    }

    if [event][module] == "iis" {

        if [client] {

            mutate {

                update => { "client" => "iis" }

            }

        } else {

            mutate {

                add_field => {

                    "client" => "iis"

                }

            }

        }

        mutate {

            lowercase => [ "indexname" ]

        }

    }

}

output {

    if [client] {

        if [client] == "iis" {

                if [indexname] {

                    elasticsearch {

                        hosts => [ " " ]

                        index=> "rq-%{[client]}-%{[indexname]}-%{+YYYY.MM.dd}"

                                cacert => " "

                        user=> " "

                        password=> " "

                            }

                } else {

                    elasticsearch {

                        hosts => [ " " ]

                        index=> "rq-%{[client]}-%{+YYYY.MM.dd}"

                                cacert => " "

                        user=> " "

                        password=> " "

                            }

                }

        }

        if [client] == "winlog" {

                if [indexname] {

                    elasticsearch {

                        hosts => [ " " ]

                        index=> "rq-%{[client]}-%{[indexname]}-%{+YYYY.MM}"

                                cacert => " "

                        user=> " "

                        password=> " "

                            }

                }

        }

        else {

        if [indexname] {

            elasticsearch {

                hosts => [ " " ]

                index=> "rq-%{[client]}-%{[indexname]}-%{+YYYY.MM.dd}"

                cacert => " "

                user=> " "

                password=> " "

            }

        } else {

            elasticsearch {

                hosts => [ " " ]

                index=> "rq-%{[client]}-%{+YYYY.MM.dd}"

                cacert => " "

                user=> " "

                password=> " "

            }

        }

        }

    } else {

        elasticsearch {

            hosts => [ " " ]

            index=> "rq-%{+YYYY.MM.dd}"

           cacert => " "

             user=> " "

             password=> " "

        }

    }

}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.