Logstash duplicating records while reading from elasticsearch and writing to bigquery

Hi, everyone.

Everytime logstash runs, it writes all records from my elasticsearch index to my bigquery database.
The problem is that the records get duplicated at every run.
My goal is to get only those records that were updated or inserted at the index to get updated/inserted at the bigquery database.

Is that possible?

This is my config file:

    input {
            elasticsearch {
                    hosts => ["https://myclusteraddress:9200/"]
                    index => "myindex*"
                    user => "myusename"
                    password => "mypassword"
                    docinfo => true
            }
    }

    filter {

        mutate {

            join => { "originResponseFiles" => "," }

            rename => ["[account][type]", "[account][accountType]" ]
            rename => ["[account][agency]", "[account][accountAgency]" ]
            rename => ["[account][number]", "[account][accountNumber]" ]

            remove_field => ["@timestamp"]
            remove_field => ["cardBrand"]
            remove_field => ["@version"]
        }
        ruby {
            code => "

                event.get('account').each {|k, v|
                    event.set(k, v)
                }
                event.remove('account')
            "
        }
    }

    output {
            #stdout {
            #       codec => rubydebug
            #}
            google_bigquery {
                    project_id => "data-prod-248920"
                    dataset => "sandbox"
                    table_prefix => "retorno_bloqueio_domicilio"
                    batch_size => 1000
                    id => "ES_to_BQ"
                    table_separator => ""
                    csv_schema => "liquidId:STRING,statusDescription:STRING,paymentDate:STRING,rejectionDate:STRING,accountAgency:STRING,accountNumber:STRING,accountType:STRING,accountAccount:STRING,amount:FLOAT,documentNumber:STRING,id:STRING,updatedAt:STRING,originRequestFile:STRING,merchantName:STRING,errors:STRING,expectedDate:STRING,originResponseFiles:STRING,bankName:STRING,status:STRING,type:STRING"
                    json_key_file => "/somepath/somekey.key"
                    error_directory => "logs"
                    date_pattern => ""
                    flush_interval_secs => 30
       }
    }

Thanks!

That's the way that the Elasticsearch input works. You will need to figure out a way to create a unique ID for each event, and then overwrite them in BQ.

I do have an unique id (the "id" column). I even have a column with the date of the last update.

I can work some logic on that.

Do you know how can I make the big query output plugin to just update the document based on the document id?

Thanks.

I don't sorry.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.