Duplicate records in Elasticsearch

Hello
I'm facing an issue with duplicate records in my index on rollover.

I thought using rollover alias would let only one index to be "write" index which is supposed to not let any batch of records being indexed to be written in 2 indices.
Also I have specified the document id to be the unique ID from my DB table so that if the same record is sent to Elasticsearch, it should be overwritten and not indexed as a new record.

Logstash pipeline configuration :

input {
    jdbc {
        jdbc_driver_library => "**"
        jdbc_driver_class => "**"
        jdbc_connection_string => "**"
        jdbc_user => "**"
		jdbc_password => "**"
		jdbc_validate_connection => true
		jdbc_default_timezone => "Asia/Riyadh"
		jdbc_validation_timeout => 60
        	schedule => "*/1 * * * *"
		connection_retry_attempts => 1
		last_run_metadata_path => "**/metadata/service_logger_lastrun" 
		tracking_column => "call_start_time"
		tracking_column_type => "timestamp"
		use_column_value => "true"
        statement => "
					SELECT 
				ID,
				CALLID AS SERVICE_DETAILS,
				SERVED_NUMBER,
				DURATION,
				DBTIMESEQ AS CALL_START_TIME,
				SERVICE_NUMBER AS SERVICE_NAME,
				CASE 
				WHEN STATUS = 'SUCCESS' AND JSON_VALUE(response, '$.Response.status.error.errorDescription') = 'No data found' THEN 'No Data Found'
				WHEN STATUS <> 'SUCCESS' THEN STATUS
				WHEN RESPONSE IS NULL AND STATUS = 'SUCCESS' THEN STATUS
				WHEN JSON_VALUE(response, '$.Response.status.result') IS NOT NULL THEN JSON_VALUE(response, '$.Response.status.result')
				WHEN JSON_VALUE(response, '$.eaiResponse.status.result') IS NOT NULL THEN JSON_VALUE(response, '$.eaiResponse.status.result')
				ELSE STATUS
				END AS STATUS,
				CASE
				WHEN JSON_VALUE(response, '$.eaiResponse.status.error.errorDescription') IS NOT NULL THEN JSON_VALUE(response, '$.eaiResponse.status.error.errorDescription')
				WHEN JSON_VALUE(response, '$.Response.status.error.errorDescription')  IS NOT NULL THEN JSON_VALUE(response, '$.Response.status.error.errorDescription')
				ELSE NULL
				END AS ERROR_DESCRIPTION,
				CASE
				WHEN JSON_VALUE(response, '$.eaiResponse.status.error.errorCategory') IS NOT NULL THEN JSON_VALUE(response, '$.eaiResponse.status.error.errorCategory')
				WHEN JSON_VALUE(response, '$.Response.status.error.errorCategory')  IS NOT NULL THEN JSON_VALUE(response, '$.Response.status.error.errorCategory')
				ELSE NULL
				END AS ERROR_CATEGORY,
				CASE
				WHEN JSON_VALUE(response, '$.eaiResponse.status.error.errorCode') IS NOT NULL THEN JSON_VALUE(response, '$.eaiResponse.status.error.errorCode')
                                WHEN JSON_VALUE(response, '$.Response.status.error.errorCode')  IS NOT NULL THEN JSON_VALUE(response, '$.Response.status.error.errorCode')
                                ELSE NULL
                                END AS ERROR_CODE,
				CASE
				WHEN JSON_VALUE(response, '$.eaiResponse.status.error.errorType') IS NOT NULL THEN JSON_VALUE(response, '$.eaiResponse.status.error.errorType')
				WHEN JSON_VALUE(response, '$.Response.status.error.errorType')  IS NOT NULL THEN JSON_VALUE(response, '$.Response.status.error.errorType')
				ELSE NULL
				END AS ERROR_TYPE,
				ERRORS_DESCRIPTION,
				REQUEST,
				RESPONSE,
				CUSTOM1,
				CUSTOM2
			FROM SERVICE_LOGGER
			WHERE DBTIMESEQ >  :sql_last_value
		  "

		tags => ["service_logger"]
    }
}
filter {
}
output{

 
    elasticsearch {	
        hosts => ["**"]
		timeout => 60
		#index => "bk_service_logger-%{+YYYY.MM.dd}"
		document_id => "%{id}"
		user => "**"
		password => "**"
		ssl => true
		ssl_certificate_verification => true
        ssl_certificate_authorities => ['**']
		template_name	   => "service_logger_template "
		ilm_enabled        => true
		ilm_rollover_alias => "bk_service_logger"
		index			   => "bk_service_logger"
		ilm_pattern        => "{now/d{yyyy.MM.dd}}-000001"         
		ilm_policy         => "service_logger_90d_ILM_policy"
		manage_template    => false
		}
   
}

I have read an article about deduplication which mentioned "fingerprint" filter but what I understood that the fingerprint is also unique across the index only. Meaning rolling over to a new index would still consider the same unique ID (from the previous index) to be new (in the new index) and hence, the duplication occurs.

Is there any way to solve this ?

Er, where and how specifically have you configured this ? I don’t see that in what you shared. Maybe I missed it?

Template + ILM i suppose..

Template settings :

{
  "index": {
    "lifecycle": {
      "name": "service_logger_90d_ILM_policy",
      "rollover_alias": "bk_service_logger"
    },
    "number_of_replicas": "0"
  }
}

ILM :

GET _ilm/policy/service_logger_90d_ILM_policy
{
  "service_logger_90d_ILM_policy": {
    "version": 2,
    "modified_date": "2025-07-31T11:34:05.137Z",
    "policy": {
      "phases": {
        "warm": {
          "min_age": "25h",
          "actions": {
            "set_priority": {
              "priority": 50
            },
            "forcemerge": {
              "max_num_segments": 1
            }
          }
        },
        "hot": {
          "min_age": "0ms",
          "actions": {
            "set_priority": {
              "priority": 100
            },
            "rollover": {
              "max_age": "24h",
              "max_primary_shard_size": "50gb"
            }
          }
        },
        "delete": {
          "min_age": "90d",
          "actions": {
            "delete": {
              "delete_searchable_snapshot": true
            }
          }
        }
      }
    },
    "in_use_by": {
      "indices": [
        "bk_service_logger-2025.06.08-000018",
        "bk_service_logger-2025.05.25-000004",
        "bk_service_logger-2025.06.09-000019",
        "bk_service_logger-2025.05.27-000006",
        "bk_service_logger-2025.06.30-000036",
        "bk_service_logger-2025.05.26-000005",
        "bk_service_logger-2025.06.05-000015",
        "bk_service_logger-2025.05.22-000001",
        "bk_service_logger-2025.06.06-000016",
        "bk_service_logger-2025.05.24-000003",
        "bk_service_logger-2025.05.23-000002",
        "bk_service_logger-2025.06.07-000017",
        "bk_service_logger-2025.07.10-000046",
        "bk_service_logger-2025.07.11-000047",
        "bk_service_logger-2025.05.29-000008",
        "bk_service_logger-2025.05.28-000007",
        "bk_service_logger-2025.07.13-000049",
        "bk_service_logger-2025.07.12-000048",
        "bk_service_logger-2025.07.25-000061",
        "bk_service_logger-2025.07.26-000062",
        "bk_service_logger-2025.07.24-000060",
        "bk_service_logger-2025.06.02-000012",
        "bk_service_logger-2025.06.03-000013",
        "bk_service_logger-2025.06.04-000014",
        "bk_service_logger-2025.06.01-000011",
        "bk_service_logger-2025.07.21-000057",
        "bk_service_logger-2025.05.31-000010",
        "bk_service_logger-2025.07.23-000059",
        "bk_service_logger-2025.06.16-000022",
        "bk_service_logger-2025.06.18-000024",
        "bk_service_logger-2025.07.02-000038",
        "bk_service_logger-2025.06.23-000029",
        "bk_service_logger-2025.07.28-000064",
        "bk_service_logger-2025.06.21-000027",
        "bk_service_logger-2025.07.18-000054",
        "bk_service_logger-2025.07.16-000052",
        "bk_service_logger-2025.07.14-000050",
        "bk_service_logger-2025.06.10-000020",
        "bk_service_logger-2025.07.30-000066",
        "bk_service_logger-2025.06.24-000030",
        "bk_service_logger-2025.06.25-000031",
        "bk_service_logger-2025.06.26-000032",
        "bk_service_logger-2025.06.27-000033",
        "bk_service_logger-2025.06.28-000034",
        "bk_service_logger-2025.06.29-000035",
        "bk_service_logger-2025.07.09-000045",
        "bk_service_logger-2025.07.08-000044",
        "bk_service_logger-2025.07.06-000042",
        "bk_service_logger-2025.07.07-000043",
        "bk_service_logger-2025.07.04-000040",
        "bk_service_logger-2025.07.05-000041",
        "bk_service_logger-2025.07.20-000056",
        "bk_service_logger-2025.07.22-000058",
        "bk_service_logger-2025.06.15-000021",
        "bk_service_logger-2025.06.17-000023",
        "bk_service_logger-2025.06.20-000026",
        "bk_service_logger-2025.06.19-000025",
        "bk_service_logger-2025.07.01-000037",
        "bk_service_logger-2025.07.03-000039",
        "bk_service_logger-2025.07.29-000065",
        "bk_service_logger-2025.06.22-000028",
        "bk_service_logger-2025.07.27-000063",
        "bk_service_logger-2025.07.19-000055",
        "bk_service_logger-2025.07.17-000053",
        "bk_service_logger-2025.05.30-000009",
        "bk_service_logger-2025.07.15-000051"
      ],
      "data_streams": [],
      "composable_templates": [
        "service_logger_template"
      ]
    }
  }
}

Sorry, still not getting it.

The _id field on your elasticsearch indices exactly match a specific database column, right? Which is sone sort of primary key, so is definitely unique in your SQL database, and never changes for lifetime of that record.

Is that all correct?

That is correct, and there are no duplicates throughout the day.
Only during the rollover, I find these duplicates.

Hello,

Is this issue similar to one discussed below :

Thanks!!

@Tortoise probably yes, but my data is not updated, it does not change once inserted in the database.

If your database query can return the same id on different executions then this is expected as the id will not be present on the new index after rollover.

A custom id on elasticsearch is unique per index, if you use rollover you can have duplicates.

1 Like

Hello @leandrojmp ,
I have tested this without using the document ID, I found that the issue is with the "sql_last_value" itself, it's not actually using the highest value of the DBTIMSEQ field, just that of the last record selected :

logstash-plain.log :

[2025-08-03T12:04:04,618][INFO ][logstash.inputs.jdbc     ][servicelogger][a1b2bcc64086ad30b261e9dfffbb0641ed39528472c43109600f8b42c079f564] (4.186133s) 
					SELECT 
				********
			FROM SERVICE_LOGGER
			WHERE DBTIMESEQ > TIMESTAMP '2025-08-03 11:28:13.187270 +03:00'
			ORDER BY DBTIMESEQ DESC
		  
[2025-08-03T12:05:04,874][INFO ][logstash.inputs.jdbc     ][servicelogger][a1b2bcc64086ad30b261e9dfffbb0641ed39528472c43109600f8b42c079f564] (4.362560s) 
					SELECT 
				*********
			FROM SERVICE_LOGGER
			WHERE DBTIMESEQ > TIMESTAMP '2025-08-03 11:28:13.767851 +03:00'
			ORDER BY DBTIMESEQ DESC
			
			
			
[2025-08-03T12:09:04,881][INFO ][logstash.inputs.jdbc     ][servicelogger][a1b2bcc64086ad30b261e9dfffbb0641ed39528472c43109600f8b42c079f564] (4.020043s) 
					SELECT 
				****
			FROM SERVICE_LOGGER
			WHERE DBTIMESEQ > TIMESTAMP '2025-08-03 11:28:29.865622 +03:00'
			ORDER BY DBTIMESEQ DESC			
			
[2025-08-03T12:10:05,292][INFO ][logstash.inputs.jdbc     ][servicelogger][a1b2bcc64086ad30b261e9dfffbb0641ed39528472c43109600f8b42c079f564] (4.293522s) 
					SELECT 
				****
			FROM SERVICE_LOGGER
			WHERE DBTIMESEQ > TIMESTAMP '2025-08-03 11:28:34.740541 +03:00'
			ORDER BY DBTIMESEQ DESC

at timestamp "12:10:05" it is still querying data with DBTIMESEQ "11:28:34" or higher, while it has already indexed records with DBTIMESEQ "12:09"


I tried using "ORDER BY" but it doesn't work, how can I force it to use the most recent value of the timestamp field?

Any update here? It's an interesting problem.

If the SQL query can return the same "record" multiple times, you are going to have a risk of that record (document) appearing in multiple rollover indices. And if stuff does not appear in the SQL database in the right order, you also risk incomplete data in elasticsearch.

Er, don't you want to use ORDER BY DBTIMESEQ ASC ? You want the last record processed to be the "latest" record, right ?

Yes that’s what i did in the end, I was just waiting for a couple of days to check the data and it’s working perfectly fine! Thank you :folded_hands:

Great that you reached a solution. But also please be careful of:

You have in your SQL SELECT ... DBTIMESEQ AS CALL_START_TIME , so if any records do not appear in the database in increasing order then you risk incomplete data. As this would not throw any error at all, and is maybe unlikely and therefore uncommon, this would be very difficult to spot if it did happen.

1 Like

That is also correct, I just discovered it while comparing data.

Really appreciate your help !! @RainTown

1 Like