Hi,
I have run the following logstash and found out it creates duplicates.
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://myURLToTheDB"
jdbc_user => "myuser"
jdbc_password => "mypassword"
last_run_metadata_path => "/etc/logstash/lastrun/.last_jdbc_run_test-nt-
controller-rawdata"
jdbc_driver_library => "/usr/share/logstash/lib/postgresql-9.4.1208.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
# schedule => "* * * * *"
record_last_run => true
# statement => "SELECT p.name as providerName, p.id as providerId, c.notes as controllerNotes, cd.content as content, cd.updatedOn as updatedOn, cd.createdOn as createdOn from providers p inner join controllers c on p.id = c.provider_id inner join controllerData cd on cd.deviceid = c.id where cd.updatedOn > :sql_last_value"
statement => "SELECT p.name as providerName, p.id as providerId, c.notes as controllerNotes, cd.content as content, cd.updatedOn as updatedOn, cd.createdOn as createdOn from providers p inner join controllers c on p.id = c.provider_id inner join controllerData cd on cd.deviceid = c.id"
type => "test-nt-controller-rawdata"
}
}
filter {
if [type] == "test-nt-controller-rawdata" {
json { source => "content" }
mutate {
remove_field => [ "content" ]
add_field => { "uid" => "%{deviceId}%{channelId}%{timestamp}" }
}
if !["intervalStart"] {
ruby {
code => "event.set('intervalStart',Time.parse(event.get('timestamp')) - 15*60)"
}
}
date {
match => ["timestamp","ISO8601"]
}
}
}
output {
if [type] == "test-nt-controller-rawdata" {
elasticsearch {
hosts => ["https://theId.eu-west-1.aws.found.io:9243"]
user => "myelasticuser"
password => "myPasswordgoeshere"
index => "test-nt-controller-data-%{+YYYY}"
document_id => "%{uid}"
document_type => "nt-controller-data"
codec => "es_bulk"
}
# stdout { codec => rubydebug }
}
}
Then I search for duplicates using the following query:
{
"size": 0,
"aggs": {
"duplicateCount": {
"terms": {
"script": "doc['deviceId.keyword'].value + doc['channelId.keyword'].value + doc['timestamp'].value",
"min_doc_count": 2
},
"aggs": {
"duplicateDocuments": {
"top_hits": {}
}
}
}
}
}
-- putting the result in a response as it doesn't fit in the question.
I am missing anything? For what I understood if you set a document_id and do a bulk index it should create or update documents with the same id...
This is found in the elastic cloud version 5.2.1. Using logstash 5.2.1 also.