Filtering a Pivot Transform

I have a pivot transform that calculates a duration between two timestamped documents by grouping them, matching like documents, then aggregating the min timestamp, the max timestamp and using a script to calculate the duration between those two:

PUT _transform/transform-name
{
  "source": {
    "index": "source-index-name"
  },
  "dest" : { 
    "index" : "dest-index-name"
  },
  "pivot": {
    "group_by": { 
      "linkId": { "terms": { "field": "linkId" }},
      "string_metadata_hash": { "terms": { "field": "string_metadata_hash" }},
      "string_calculation_job_type": {
        "terms": {
          "script": {
            "source": """
              def m = /^calculation_job_(.*)_([^_]+)$/.matcher(doc['eventType'].value);
              if (m.matches()) {
                return m.group(1);
              }
            """
          }
        }
      }
    },
    "aggregations": {
      "metadata": {
        "scripted_metric": {
          "init_script": "state.doc = new HashMap()",
          "map_script": "if (state.doc.isEmpty()){state.doc = new HashMap(params['_source'].metadata)}",
          "combine_script": "return state.doc",
          "reduce_script": "return states[0]"
        }
      },
      "start": {
        "min": {
          "field": "timestamp"
        }
      },
      "complete": {
        "max": {
          "field": "timestamp"
        }
      },
      "duration_sec": {
        "bucket_script": {
          "buckets_path": {
            "start": "start.value",
            "complete": "complete.value"
          },
          "script": "return (params.complete - params.start)/1000;"
        }
      }
    }
  },
   "sync": {
    "time": { 
      "field": "timestamp",
      "delay": "60s"
    }
  }
}

This matches event Type calculation_job_foo_start with calculation_job_foo_complete to calculate the duration between the two.

However, it also results in a document when there is only a calculation_job_foo_start and no calculation_job_foo_complete - which results in a start and complete times that are the same (both start times) and a duration_sec of 0

Is there a way to prevent output to dest index until there is both a start and complete event type? In other words, filter the output?

You can define an ingest pipeline for dest. In the ingest pipeline you can use a drop processor to prevent documents from getting indexed.

Thanks @Hendrik_Muhs did not end up needing the ingest pipeline here, but did use the concept in other ways. The premise for my question ended up being a false one.

The duration_sec of 0 is temporary, and not a problem, as long as the delay is long enough to find all the documents needed.

So I did try this approach but cannot get it to work @Hendrik_Muhs

getting a compile error on doc within the if

PUT _ingest/pipeline/name-here
{
  "processors" : [
    {
      "drop": {
        "if": "doc['duration_sec'] == 0.0"
      }
    },
    {
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

When inserted into the ingest pipeline it isn't a doc yet, you need to use the ingest context.

Try replacing doc with ctx.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.