Es_bulk unique uid creates multiple duplicates

Hi,

I have run the following logstash and found out it creates duplicates.

input {
    jdbc {
jdbc_connection_string => "jdbc:postgresql://myURLToTheDB"
        jdbc_user => "myuser"
        jdbc_password => "mypassword"
        last_run_metadata_path => "/etc/logstash/lastrun/.last_jdbc_run_test-nt-
controller-rawdata"
        jdbc_driver_library => "/usr/share/logstash/lib/postgresql-9.4.1208.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "10000"
#        schedule => "* * * * *" 
        record_last_run => true
#        statement => "SELECT p.name as providerName, p.id as providerId, c.notes as controllerNotes, cd.content as content, cd.updatedOn as updatedOn, cd.createdOn as createdOn from providers p inner join controllers c on p.id = c.provider_id inner join controllerData cd on cd.deviceid = c.id where cd.updatedOn > :sql_last_value"
        statement => "SELECT p.name as providerName, p.id as providerId, c.notes as controllerNotes, cd.content as content, cd.updatedOn as updatedOn, cd.createdOn as createdOn from providers p inner join controllers c on p.id = c.provider_id inner join controllerData cd on cd.deviceid = c.id"
        type => "test-nt-controller-rawdata"
    }
}
filter {
    if [type] == "test-nt-controller-rawdata" {
        json { source => "content" }
        mutate {
            remove_field => [ "content" ]
            add_field => { "uid" => "%{deviceId}%{channelId}%{timestamp}" }
        }
        if !["intervalStart"] {
            ruby {
                code => "event.set('intervalStart',Time.parse(event.get('timestamp')) - 15*60)"
            }
        }
        date {
            match => ["timestamp","ISO8601"]
        }
    }
}
output {
    if [type] == "test-nt-controller-rawdata" {
        elasticsearch {
            hosts => ["https://theId.eu-west-1.aws.found.io:9243"]
            user => "myelasticuser"
            password => "myPasswordgoeshere"
            index => "test-nt-controller-data-%{+YYYY}"
            document_id => "%{uid}"
            document_type => "nt-controller-data"
            codec => "es_bulk"
        }
#        stdout { codec => rubydebug }
    }
}

Then I search for duplicates using the following query:

{
  "size": 0,
  "aggs": {
    "duplicateCount": {
      "terms": {
        "script": "doc['deviceId.keyword'].value + doc['channelId.keyword'].value + doc['timestamp'].value",
        "min_doc_count": 2
      },
      "aggs": {
        "duplicateDocuments": {
          "top_hits": {}
        }
      }
    }
  }
}

-- putting the result in a response as it doesn't fit in the question.
I am missing anything? For what I understood if you set a document_id and do a bulk index it should create or update documents with the same id...

This is found in the elastic cloud version 5.2.1. Using logstash 5.2.1 also.

Response from request is:

and returns a long list like this:

{
  "took": 5179,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 2170001,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "duplicateCount": {
      "doc_count_error_upper_bound": 30,
      "sum_other_doc_count": 2169918,
      "buckets": [
        {
          "key": "00:04:a3:6d:48:a5ct121484831701000",
          "doc_count": 10,
          "duplicateDocuments": {
            "hits": {
              "total": 10,
              "max_score": 1,
              "hits": [
                {
                  "_index": "nt-controller-data-2017",
                  "_type": "nt-controller-data",
                  "_id": "AVpB5uuI_-vPxt6-APbO",
                  "_score": 1,
                  "_source": {
                    "averageCurrentInAmps": 6.174501,
                    "updatedon": "2017-01-19T13:35:16.184Z",
                    "type": "nt-controller-rawdata",
                    "energyUsedInWattHours": 373.174408,
                    "providername": "Boots",
                    "createdon": null,
                    "deviceId": "00:04:a3:6d:48:a5",
                    "uid": "00:04:a3:6d:48:a5ct122017-01-19T13:15:01Z",
                    "averageVoltageInVolts": 243.255035,
                    "@timestamp": "2017-01-19T13:15:01.000Z",
                    "providerid": 11,
                    "controllernotes": "client26_boots / Boots - Quorn",
                    "@version": "1",
                    "averageRealPowerInWatts": 1492.697632,
                    "intervalStart": "2017-01-19T13:00:01.000Z",
                    "channelId": "ct12",
                    "timestamp": "2017-01-19T13:15:01Z",
                    "ctMultiplier": 1.253
                  }
                },
                {
                  "_index": "nt-controller-data-2017",
                  "_type": "nt-controller-data",
                  "_id": "AVpBs7Xd_-vPxt6---GI",
                  "_score": 1,
                  "_source": {
                    "averageCurrentInAmps": 6.174501,
                    "updatedon": "2017-01-19T13:35:16.184Z",
                    "type": "nt-controller-rawdata",
                    "energyUsedInWattHours": 373.174408,
                    "providername": "Boots",
                    "createdon": null,
                    "deviceId": "00:04:a3:6d:48:a5",
                    "uid": "00:04:a3:6d:48:a5ct122017-01-19T13:15:01Z",
                    "averageVoltageInVolts": 243.255035,
                    "@timestamp": "2017-01-19T13:15:01.000Z",
                    "providerid": 11,
                    "controllernotes": "client26_boots / Boots - Quorn",
                    "@version": "1",
                    "averageRealPowerInWatts": 1492.697632,
                    "intervalStart": "2017-01-19T13:00:01.000Z",
                    "channelId": "ct12",
                    "timestamp": "2017-01-19T13:15:01Z",
                    "ctMultiplier": 1.253
                  }
                },
                {
                  "_index": "nt-controller-data-2017",
                  "_type": "nt-controller-data",
                  "_id": "AVpBtC-__-vPxt6-_Buc",
                  "_score": 1,
                  "_source": {
                    "averageCurrentInAmps": 6.174501,
                    "updatedon": "2017-01-19T13:35:16.184Z",
                    "type": "nt-controller-rawdata",
                    "energyUsedInWattHours": 373.174408,
                    "providername": "Boots",
                    "createdon": null,
                    "deviceId": "00:04:a3:6d:48:a5",
                    "uid": "00:04:a3:6d:48:a5ct122017-01-19T13:15:01Z",
                    "averageVoltageInVolts": 243.255035,
                    "@timestamp": "2017-01-19T13:15:01.000Z",
                    "providerid": 11,
                    "controllernotes": "client26_boots / Boots - Quorn",
                    "@version": "1",
                    "averageRealPowerInWatts": 1492.697632,
                    "intervalStart": "2017-01-19T13:00:01.000Z",
                    "channelId": "ct12",
                    "timestamp": "2017-01-19T13:15:01Z",
                    "ctMultiplier": 1.253
                  }
                }
              ]
            }
          }
        },
        {
          "key": "00:04:a3:6d:40:c5ct561484017200000",
          "doc_count": 9,
          "duplicateDocuments": {
            "hits": {
              "total": 9,
              "max_score": 1,
              "hits": [
                {
                  "_index": "nt-controller-data-2017",
                  "_type": "nt-controller-data",
                  "_id": "AVpCW63O_-vPxt6-CGdv",
                  "_score": 1,
                  "_source": {
                    "averageCurrentInAmps": 30.668692,
                    "updatedon": "2017-01-10T03:14:30.425Z",
                    "type": "nt-controller-rawdata",
                    "energyUsedInWattHours": 1671.182251,
                    "providername": "Risley Park",
                    "createdon": null,
                    "deviceId": "00:04:a3:6d:40:c5",
                    "uid": "00:04:a3:6d:40:c5ct562017-01-10T03:00:00Z",
                    "averageVoltageInVolts": 240.721695,
                    "@timestamp": "2017-01-10T03:00:00.000Z",
                    "providerid": 4,
                    "controllernotes": "client07_risley_park / Risley Park",
                    "@version": "1",
                    "averageRealPowerInWatts": 6684.729004,
                    "intervalStart": "2017-01-10T02:45:00.000Z",
                    "channelId": "ct56",
                    "timestamp": "2017-01-10T03:00:00Z",
                    "ctMultiplier": 1.253
                  }
                }...
              ]
            }
          }
        }

found when trying this:
https://discuss.elastic.co/t/bulk-logstash-load-from-db-cluster-3c1033/75075?source_topic_id=75351

that I think partly provoke this:
https://discuss.elastic.co/t/cluster-with-id-3c1033-stuck/75313?source_topic_id=75351

The id of the documents you highlighted does not seem to be named based on the uid field. Do you by any chance have additional config files in your directory that are causing records to be written multiple times?

As not all the document where been indexed due CPU pressure, I did run the bulk index multiple times.

I basically did scheduled to run every 5 hours.

Could it be that they are overlapping (multiple instances running at the same time) and, for that reason, ignoring the UID?

Thanks!

You should not have codec => "es_bulk" in your elastic search output. Unless that is messing things up somehow, the document id would be set by Logstash, so it should not matter if you run the job multiple times. Check your config directory for other config files. Logstash will concatenate all of them, so if you have an older version that does not set the document_id, that could result in duplicates.

ok, trying doing the bulk insert tonight but codec=> "json"

I do also suspect on the "es_bulk" but thought it will be useful when indexing the nearly 2000000 records,

What is the use of "es_bulk" as a codec otherwise?

Thanks @Christian_Dahlqvist I will reply with my findings.

The elasticsearch plugin handles this internally, so you should not use any codec.

ok, I will then remove that "codec"
Ta

Ok, me silly.

Apologies but I was missing the document_id output part... even if I have put it in the 1st message here.

So no problem here. Thanks again!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.