Elastic Latest Transform is not working if sync time and sort time is different

{
  "id": "poc.transform",
  "version": "7.17.11",
  "create_time": 1709818588566,
  "source": {
    "index": [
      "poc.test.source*"
    ],
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "poc.test.transform"
  },
  "frequency": "1m",
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "60s"
    }
  },
  "latest": {
    "unique_key": [
      "entity_name.keyword"
    ],
    "sort": "occured_timestamp"
  },
  "settings": {
    "max_page_search_size": 500
  }
}

Here I am trying to get the latest records from the source index and ingest the data into transformed index. Since I cannot depend on the ingestion time(@timestamp) to find the latest record, because of the possibility of non chronological insertion of data. I have a field called occured_timestamp. From which I have to get the latest record. Hence I have added it to sort key. But this seems to be not working. No matter what is the value in occured_timestamp. Data seems to be getting ingested into transformed index. Anything wrong with my code?

Example Source Index

[
{
    "entity_name":"enity1",
	"occured_timestamp":"2024-03-07T09:14:29.000Z"
    "@timestamp": "2024-03-07T14:02:59.000Z"
},
{
    "entity_name":"enity1",
	"occured_timestamp":"2024-03-07T08:01:29.000Z"
    "@timestamp": "2024-03-07T14:03:59.000Z"
},
{
    "entity_name":"enity1",
	"occured_timestamp":"2024-03-07T07:01:29.000Z"
    "@timestamp": "2024-03-07T14:04:59.000Z"
}
]

Output Expected in destination Index

[
{
    "entity_name":"enity1",
	"occured_timestamp":"2024-03-07T09:14:29.000Z"
    "@timestamp": "2024-03-07T14:02:59.000Z"
}]

Hi @shivaraj_kv ,

It seems you are using the @timestamp field to keep the transform in sync, however, I'd recommend you to use the occured_timestamp field instead ( see our doc ).

The "sync" part of your transform configuration should then look like this :

 "sync": {
    "time": {
      "field": "occured_timestamp",
      "delay": "60s"
    }

Also, where does you occured_timestamp field come from ? How old can it be (maximum difference betweeen occured_timestamp and @timestamp) ?

Since my unique key is entity_name and occured_timestamp for different entity_name can be same or different and can come in any order. Let's say for entity1 occured_timestamp can be today and for entity2 it can be yesterday sometime. Also the order of occured_timestamp is not guaranteed.

Is using the latest transform the right usecase or should we use pivot trabsform. If so how?

Did you try changing the sync to "occured_timestamp" as I pointed ?

You wrote

"No matter what is the value in occured_timestamp. Data seems to be getting ingested into transformed index. "

I think this is because you are using the @timestamp field to check for new documents, you should change your transform configuration as follows :

PUT _transform/poc-transform
{
  "source": {
    "index": [
      "poc.test.source*"
    ]
  },
  "latest": {
    "unique_key": [
      "entity_name.keyword"
    ],
    "sort": "occured_timestamp"
  },
  "dest": {
    "index": "poc.test.transform"
  },
  "sync": {
    "time": {
      "field": "occured_timestamp"
    }
  }
}

Yes, Tried. But it is skipping some values due to unordered ingestion of occured_timestamp

But it is skipping some values due to unordered ingestion of occured_timestamp

If it's skipping old occured_timestamp for entities, I think that's an expected behavior as I understand you only want the latest timestamp for each entity.

If it misses intermediate entity_names with more recent occured_timestamp, you can mitigate that by lowering the frequency so that your transform checks for changes in the source index more often.

If the transform is not lagging, the worst case delay you'll face is :
query_delay + frequency .

Hope that helps

Another reason for missing update, as pointed by @przemekwitek here is the following : when the document is ingested into source index, its timestamp is "old", i.e.: older than 60s (the configured delay) than the actual server timestamp.

Yet that can be fixed by either :

  • using @timestamp field in the sync.time property, as you did originally and with a lower frequency setting,
  • or, @timestamp is not the ingested time, by setting up an ingest pipeline on the source index that will populate a new event.ingested field for every source document and then to use event.ingested field in the sync.time section of the transform config.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.