Filtering a Pivot Transform

I have a pivot transform that calculates a duration between two timestamped documents by grouping them, matching like documents, then aggregating the min timestamp, the max timestamp and using a script to calculate the duration between those two:

PUT _transform/transform-name
  "source": {
    "index": "source-index-name"
  "dest" : { 
    "index" : "dest-index-name"
  "pivot": {
    "group_by": { 
      "linkId": { "terms": { "field": "linkId" }},
      "string_metadata_hash": { "terms": { "field": "string_metadata_hash" }},
      "string_calculation_job_type": {
        "terms": {
          "script": {
            "source": """
              def m = /^calculation_job_(.*)_([^_]+)$/.matcher(doc['eventType'].value);
              if (m.matches()) {
    "aggregations": {
      "metadata": {
        "scripted_metric": {
          "init_script": "state.doc = new HashMap()",
          "map_script": "if (state.doc.isEmpty()){state.doc = new HashMap(params['_source'].metadata)}",
          "combine_script": "return state.doc",
          "reduce_script": "return states[0]"
      "start": {
        "min": {
          "field": "timestamp"
      "complete": {
        "max": {
          "field": "timestamp"
      "duration_sec": {
        "bucket_script": {
          "buckets_path": {
            "start": "start.value",
            "complete": "complete.value"
          "script": "return (params.complete - params.start)/1000;"
   "sync": {
    "time": { 
      "field": "timestamp",
      "delay": "60s"

This matches event Type calculation_job_foo_start with calculation_job_foo_complete to calculate the duration between the two.

However, it also results in a document when there is only a calculation_job_foo_start and no calculation_job_foo_complete - which results in a start and complete times that are the same (both start times) and a duration_sec of 0

Is there a way to prevent output to dest index until there is both a start and complete event type? In other words, filter the output?

You can define an ingest pipeline for dest. In the ingest pipeline you can use a drop processor to prevent documents from getting indexed.

Thanks @Hendrik_Muhs did not end up needing the ingest pipeline here, but did use the concept in other ways. The premise for my question ended up being a false one.

The duration_sec of 0 is temporary, and not a problem, as long as the delay is long enough to find all the documents needed.

So I did try this approach but cannot get it to work @Hendrik_Muhs

getting a compile error on doc within the if

PUT _ingest/pipeline/name-here
  "processors" : [
      "drop": {
        "if": "doc['duration_sec'] == 0.0"
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"

When inserted into the ingest pipeline it isn't a doc yet, you need to use the ingest context.

Try replacing doc with ctx.

