ILM with rollovers breaking data integrity

Hello,

We have an index where we need to continuously write new docs, run full-text searches and update the matched docs with a flag. The data consists of social media posts and news articles with timestamps.

We previously allowed this index to grow infinitely, but this made searches very slow and increased our costs. We identified that retaining just the last 24-48 hrs of data is good enough for business requirements and would keep search speed and costs to an acceptable level.

We tried to achieve this by creating an index with an ILM policy and alias. The policy rolls over the index every 1 hours and deletes it every 1 day. Index remains in the hot phase till deletion.

However as we started writing larger volumes to the alias (1M+ docs per day via many streaming bulk writes), we discovered a loss of data integrity and docs with different _id getting mixed up . For example doc 1 originally had text: foo and doc 2 had text: bar. But after writing to the alias, doc 1 had text: bar, and the _version for the doc was also higher than expected.

An important detail is that this is happening even with the search application turned off. i.e. it appears to be an issue with the indexing, not with updates.

Is this known / expected behaviour from such a setup? And is there a more suitable approach for retaining docs for only 24 hours? Any help or insights would be much appreciated!

Hello and welcome,

Can you provide some evidence of this? It is not clear what is the issue here.

How are you indexing the data? Are you using custom _id ? For a doc to be updated you would need to make a request using the same _id.

Please provide some evidences of the issue, like the documents, and how you are indexing your data.

As you are using rollover it is worth noting that all writes and updates will go to the most recent index. If you write a document and a rollover then happens before it is subsequently updated you will likely end up with 2 separate document in different indices. As this surfaced when you started increasing data volumes I assume rollover occurred more frequently, making the issue more noticable?

Use cases requiring updates do not necessarily play nicely with rollover.

1 Like

I see. So is there a better way to automate time-based deletions for data while allowing updates?

We are indexing the data using the elasticsearch-hadoop connector as the source is a Spark dataframe, but are seeing this issue even if we use the Python client.

We are setting the _id via the es.mapping.id as mentioned in the docs, but are seeing the issue even if we remove that line and allow ES to auto generate the _id.

This is happening before any updates - our end goal is to search and update the data but we've disabled the search application and still seeing this issue.

Adding a sample original/corrupted doc below:

Original (in Spark dataframe)

{
    "_id": "3905871722299717800",
    "text_meta": "Тот Самый Сорванец Ответы@Mail.Ru: Все категории Какие плюсы членам от БРИКС,особенно России? Реальные плюсы?  https://otvet.mail.ru/question/240318570",
    "text": "Какие плюсы членам от БРИКС,особенно России? Реальные плюсы?",
    "title": "Ответы@Mail.Ru: Все категории",
    "url": " https://otvet.mail.ru/question/240318570",
    "from": "Тот Самый Сорванец",
}

Corrupted (after indexing into rollover alias)

{
    "_index": "prod-matchmaker-000362",
    "_id": "3905871722299717800",
    "_source": {
    "text_meta": "uncle_benny Well I could have told you Donnie,the world was never meant for oneas horrible as you. https://www.palmerreport.com/analysis/donald-trump-believes-in-nothing/58664/#comment-6576487289",
    "text": "I recently stayed at a 'Staypineapple' hotel in San Francisco.They give out warm pineapple shaped cookies .They were lovely.",
    "title": """Nieudana randka Pana Miszy, bursisty Korczaka na rogu Granicznej i Królewskiej - "Ja przecież, wieczorami, o dziewiątej, mam dyżur w sypialniach dzieci w Domu Sierot Korczaka".""",
    "url": "https://www.reclameaqui.com.br/serasaconsumidor_178670/quero-limpar-meu-nome_GBIb3k8K6z76ioKq/",
    "from": "南奴劉邦跪舔冒頓單虞"
    }
}

Elasticsearch does not corrupt data like that so I suspect you have multiple records in your data frame with the same ID and that this is causing an overwrite/update during indexing. What is the version number associated with that document?

If you set the bulk action to create instead of index any writes to already existing documents will fail and you should be able to catch this in your application.

Easiest way is to have a single index and delete using delete-by-query based on a timestamp field. I assume you have already tried this.

You are using a custom _id, for Elasticsearch to write that document it means that it received a request to write that document with that same id.

I don't think this is possible, with auto generated _id every document will have a different id, so nothing will be updated.

Are your ids unique in your data source? How they are being generated? It looks that your issue is in your source or the code you are using to write the data in elasticsearch.

1 Like

Also, the _id are only unique in the same index, if you are using rollovers, making a request with the same _id may create a new document with the same _id if the writing indice has changed.

2 Likes

We will try setting the bulk action to create and look out for errors, thanks!

The delete-by-query method is our last resort as we've observed it to be a slow and expensive operation when deleting millions of docs.

Another approach we are considering is ILM without rollovers or alias. It would be like this:

  • create a new ILM policy which deletes any index with the pattern name-* when its max_age reaches 2 days
  • make the Spark workflow write to index name-<current_date>
  • in the search application, run the search queries on name-* . the bulk flag update will reference the _index field of each matched doc
  • rationale for setting max_age as 2 days: suppose the Spark workflow writes to name-2024-10-26 at 00:00 hours for the first time, resulting in the index creation. if max_age is 1 day, this index will live till 00 hours on 2024-10-27, and the search app will have only one hour to match anything that was written to the index at 23 hours on 2024-10-26. by making it 2 days we give all new data at least 24 hours to get matched, which is our business requirement

Do you think this approach makes sense / has flaws?

That makes sense. But we are not seeing duplicate _id docs in ES, we are seeing fields mixed up in unique _id docs.

If each document is associated with a timestamp, e.g. created time, and this is available in your application both at initial index time as well as when you update you can use daily indices with ILM. In this case each index would cover data created during e.g. a single day and the index name would be something like myindex-20241026. The application would calculate the correct index to send the document or update to based on the timestamp and all indexing and update operations for a single document would go to the same index. ILM can still be used, but will be configured to delete data based on the index creation time. Noe that you may need to prevent data older than X days to be written or updated as it could create new indices that have already been deleted by ILM. You should also make sure you do not have futuredated documents.

This is most likely a sign that your document IDs are not unique at the time you are creating the documents.

1 Like

We are generating the unique ids upstream by hashing the text, url and from columns using xxhash64 and de-duplicating them in a separate table before writing to ES

It will be interesting to see if you get any errors when you change from index to create operation.

1 Like

Which version of Elasticsearch are you using? This is something you should always include as it may make a difference.

Yes sorry forgot to mention that.
v8.14.2
Maven package for the Spark writes: org.elasticsearch:elasticsearch-spark-30_2.12:8.11.3

1 Like