Transform not updating documents

Hi

I have a continious transform running with the following query applied which should only allow documents with the status open or closed in the trasnform destination however when documents in these status are updated in the source to another status such as archived the changes don't get reflected in the destination.
My understanding was that if a document changed like this then it should no longer be in the destination index.
For reference the source index is populated using logstash from a kafka broker

{
  "bool": {
    "should": [
      {
        "bool": {
          "should": [{"match_phrase": { "status": "Open" } } ],
          "minimum_should_match": 1
        }
      },
      {
        "bool": {
          "should": [{"match_phrase": { "status": "Closed" }}],
          "minimum_should_match": 1
        }
      }
    ],
    "minimum_should_match": 1
  }
}

Snippet of the config

"frequency": "1m",
  "sync": {
    "time": {
      "field": "ingest_timestamp",
      "delay": "300s"
    }
  },
  "latest": {
    "unique_key": ["status_id" ],
    "sort": "ingest_timestamp"
  },

I don't know a heap about transforms, but I can see that this might be a current limitation - Transform limitations | Elasticsearch Guide [8.5] | Elastic

By specifying a query you basically define a filter. Documents that aren't matched are not processed in the transform. That's equivalent to a search query with aggregations, the aggregation only happens for those queries that passed the search which runs first.

With other words: Your search query filters anything that isn't either Open or Closed. A document with status Archived isn't even making it into the transform part.

A query filter isn't suitable for your use case, you should let all status documents pass the query. That means that also documents with status Archived will end in the transform index. If you want to create a dashboard with all Open and Closed entities, you can use your query filter on the destination index.

To curate the destination index and delete old documents, have a look at retention_policy, it allows you to delete old documents based on a timestamp.

1 Like

Ah OK I thought that transforms worked slightly differently in that if the document in the source that was matched by the unique id was no longer in scope for the transform the document in the destination would be removed.

I will check the retention_policy item but I don't mind how old a document is once it is only in the status of Open or Closed.

@Hendrik_Muhs maybe I am missing something here but retention policy doesnt seem to be what i am looking for?

The destination index would contain approx 100m documents and be 120gb if I allowed all status to flow through to destination index so the dashboard performance would be poor (from inital testing).

It sounds like you're trying to fix another issue by taking this approach, perhaps we can dig into the underlying problem you've highlighted in another topic?

@warkolm maybe some background will add context here,
We ingest data from kafka, its a large volume of data (120gb/100m docs), the dashboard should only visualise the documents in status open or closed.

The source index gets updated via logstash if the document changes status, the idea was to use a transform to create a subset of documents (25m/26gb) that would be easier to visualise on dashboard including runtime fields and categorical data.

++

It'd be good to start a new topic with that and some more info on the issues you are seeing when you load it, that way it can be kept separate from the transform question.

retention_policy works time based, it basically queries for docs that a are older than a specified age. If you want to query based on certain keywords, e.g. a status, retention_policy is unfortunately not what you are looking for. We've came across that use case before, I think there is a enhancement request for this, but you are right, this is currently not supported.

You can however help yourself. You need something that can fire a query in regular intervals, e.g. nightly. This could be a simple unix cron job or a watcher. To delete documents with e.g. status Archived you run a delete-by-query.