Two Transform jobs overwrite result doc ID of each other (duplicate _id)

Hello,

during my journey with migration of existing, "home-made" transformations to Transform API (continuous mode), I faced one minor issue - ability to add static fields into result documents, on the beginning I tried with runtime_mappings but it was not so effective as then I basically bypass static field in source instead of result object, then I implement such functionality using ingest pipelines, but my main issue is that 2 from 4 transform job overwrite results of each other by _id

Goal of those jobs is to calculate aggregated data for same period (month), due to the case that different query/grouping conditions apply I can't make it with single Transform job so I split them on 4 jobs, logically they could be explained like:

  1. a > b
  2. a > c
  3. b > c
  4. a > c

where each symbol are index, each has or different query or grouping and for sure each job produce different result doc with different result fields names, but somehow last two even with different sources, conditions and result fields overwrite each other by result _id, how I can ensure uniqueness of result doc _id in this scenario?

I test this behaviour for several times, if I start all of them by order in list results of job 3 disappear (overwritten) by job 4, if I recreate transform job 3 and trigger it - same doc _id that were holding data from 4 replaced with data from 3.

Job 3:

PUT _transform/foo-3
{
  "source": {
    "index": [
      "foo-hourly"
    ]
  },
  "pivot": {
    "group_by": {
      "@timestamp": {
        "date_histogram": {
          "field": "@timestamp",
          "calendar_interval": "1M"
        }
      },
    },
    "aggregations": {
      "count.percentiles": {
        "percentiles": {
          "field": "count",
          "percents": [
            50
          ]
        }
      },
      "count.max": {
        "max": {
          "field": "count"
        }
      }
    }
  },
  "frequency": "1h",
  "dest": {
    "index": "foo-monthly",
    "pipeline": "foo"
  },
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "1d"
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

Job 4:

PUT _transform/foo-4
{
  "source": {
    "index": [
      "foo"
    ],
    "query": {
      "bool": {
        "should": [
          {
              "exists": {
                "field": "boo"
              }
          }
        ],
        "minimum_should_match": 1
      }
    }
  },
  "pivot": {
    "group_by": {
      "@timestamp": {
        "date_histogram": {
          "field": "@timestamp",
          "calendar_interval": "1M"
        }
      },
      "somekey": {
        "terms": {
          "field": "somekey"
        }
      }
    },
    "aggregations": {
      "all.bucket": {
        "filter": {
          "query_string": {
            "query": "*",
            "analyze_wildcard": true
          }
        },
        "aggs": {
          "count": {
            "value_count": {
              "field": "someresult"
            }
          }
        }
      },
      "success.bucket": {
        "filter": {
          "query_string": {
            "query": "NOT someresult:2 AND NOT someresult:3 AND NOT someresult:4",
            "analyze_wildcard": true
          }
        },
        "aggs": {
          "count": {
            "value_count": {
              "field": "someresult"
            }
          }
        }
      },
      "success.rate": {
        "bucket_script": {
          "buckets_path": {
            "total_count": "all.bucket>count",
            "success_count": "success.bucket>count"
          },
          "script": "params.success_count / params.total_count * 100"
        }
      }
    }
  },
  "frequency": "1h",
  "dest": {
    "index": "foo-monthly",
    "pipeline": "foo"
  },
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "1d"
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

how I can be sure that transform jobs will not overwrite each other results?

for now I'm using "workaround" and adding and bypassing transform job id to resolve such _id collision, like:

PUT _transform/foo-3
{
  "source": {
    "index": [
      "foo"
    ],
    "runtime_mappings": {
      "transform.id": {
        "type": "keyword",
        "script": {
          "source": "emit('foo-3')"
        }
      }
    }
  },
  "pivot": {
    "group_by": {
      "@timestamp": {
        ...
      },
      "transform.id": {
        "terms": {
          "field": "transform.id"
        }
      },
      ...
   ...
}

Thanks

What you see is by design, we do not recommend to let multiple transforms write into the same destination index. Transform calculates the doc_id from the group_by bucket values, if 2 transforms produce the same bucket values, the same doc_id gets produced.

I understand your use case and your workaround looks good to me. As an alternative I can list 2 more options:

  • recommended: write into separate indices and query using a pattern, e.g. dest-*. The overhead of heaving 4 instead of 1 destination index is negligible.
  • calculate and overwrite the doc_id yourself by using a fingerprint processor using a ingest pipeline. Similar to transform use the bucket values of the group_by fields, not the produced values from the aggregation part. The doc_id must be created in a way that is deterministic and repeatable, so documents can get overridden. By using a different salt value for each transform, the individual transform results won't override each other.

In future we consider making the doc id generation in transform configurable, e.g. providing a salt value similar to the fingerprint processor.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.