Hi, everyone.
Everytime logstash runs, it writes all records from my elasticsearch index to my bigquery database.
The problem is that the records get duplicated at every run.
My goal is to get only those records that were updated or inserted at the index to get updated/inserted at the bigquery database.
Is that possible?
This is my config file:
input {
elasticsearch {
hosts => ["https://myclusteraddress:9200/"]
index => "myindex*"
user => "myusename"
password => "mypassword"
docinfo => true
}
}
filter {
mutate {
join => { "originResponseFiles" => "," }
rename => ["[account][type]", "[account][accountType]" ]
rename => ["[account][agency]", "[account][accountAgency]" ]
rename => ["[account][number]", "[account][accountNumber]" ]
remove_field => ["@timestamp"]
remove_field => ["cardBrand"]
remove_field => ["@version"]
}
ruby {
code => "
event.get('account').each {|k, v|
event.set(k, v)
}
event.remove('account')
"
}
}
output {
#stdout {
# codec => rubydebug
#}
google_bigquery {
project_id => "data-prod-248920"
dataset => "sandbox"
table_prefix => "retorno_bloqueio_domicilio"
batch_size => 1000
id => "ES_to_BQ"
table_separator => ""
csv_schema => "liquidId:STRING,statusDescription:STRING,paymentDate:STRING,rejectionDate:STRING,accountAgency:STRING,accountNumber:STRING,accountType:STRING,accountAccount:STRING,amount:FLOAT,documentNumber:STRING,id:STRING,updatedAt:STRING,originRequestFile:STRING,merchantName:STRING,errors:STRING,expectedDate:STRING,originResponseFiles:STRING,bankName:STRING,status:STRING,type:STRING"
json_key_file => "/somepath/somekey.key"
error_directory => "logs"
date_pattern => ""
flush_interval_secs => 30
}
}
Thanks!