Finding optimal solution to use filter in aggregations of transform index

We are trying to create transform index, Following sample data is our subset where we want to find most recent event date based our duty code.

code	enterprise	   duty	     visitdate
2247 	HASTY&TASTY		1		Jul 27, 2020 @ 00:00
2247 	HASTY&TASTY		2		Jul 26, 2020 @ 00:00
2247 	HASTY&TASTY		0       Jul 25, 2020 @ 00:00
2247 	HASTY&TASTY		2	    Jun 30, 2020 @ 00:00
2247 	HASTY&TASTY		1       Jun 22, 2020 @ 00:00
2213	DunkinDonut		0		Jul 28, 2020 @ 00:00
2213	DunkinDonut		2		Jul 27, 2020 @ 00:00
2213	DunkinDonut		2		Jul 26, 2020 @ 00:00

Transform index should have only most recent dated docs where duty=2.

code     enterprise      visitdate
2247     HASTY&TASTY     Jul 26, 2020 @ 00:00
2213     DunkinDonut     Jul 27, 2020 @ 00:00

This script for transformation is working fine, where we want to know any other alternate method to implement the same, because it required multiple scripted fields for multiple use cases in same transform index where it may create loading issues.

POST _transform/_preview
{
  "source": {
    "index": [
      "visitdata*"
    ],
    "query": {
      "exists": {
        "field": "visitdate"
      }
    }
  },
  "dest": {
    "index": "nx009"
  },
  "pivot": {
    "group_by": {
      "code": {
        "terms": {
          "field": "code.keyword"
        }
      },
      "enterprise": {
        "terms": {
          "field": "enterprise.keyword"
        }
      }
    },
    "aggregations": {
      "visitdate_doc": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
          "map_script": """ 
        
        def current_date = doc['visitdate'].getValue().toInstant().toEpochMilli();
        def visited = doc['duty'].getValue();
        if (current_date > state.timestamp_latest && visited==2 )
        {state.timestamp_latest = current_date;
        state.last_doc = new HashMap(params['_source']);}
      """,
          "combine_script": "return state",
          "reduce_script": """ 
        def last_doc = '';
        def timestamp_latest = 0L;
        for (s in states) 
        {
          if (s.timestamp_latest > (timestamp_latest))
        {
          timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;
          
        }
          
        }if(last_doc != null && !last_doc.isEmpty())
            {
            return last_doc.visitdate;
            }
      """
        }
      }
    }
  }
}

Hi, kindly guide us

Hi,

I do not see anything wrong with your approach. Getting the last state is one of the top ask for transform and we might have a ootb solution for that in future. Today you need scripted_metric, we know this is complicated and fragile, but as said, there is no alternative at the moment.

Regarding your config: Are you always only interested in data points with duty == 2 (or duty>=2)? If so, I think it is better for performance, if you filter in the query, instead of filtering as part of scripted_metric. It would also be possible to put a filter aggregation right before scripted_metric, in case you do not want to filter globally.

Thanks @Hendrik_Muhs

Actually we want to have many max dates based on individual duty codes using multiple scripted_metric fields in same transform index, index filtering or filter aggregation may not suit for our case.

FWIW a sub-agg filter would look like this:

{
  "source": {
    ...
  },
  "pivot": {
    "group_by": {
      ... 
    },
    "aggregations": {
      "visited_twice": {
        "filter": {
          "term": {
            "visited": 2
          }
        },
        "aggs": {
          "visitdate_doc": {
            "scripted_metric": {
...

That means you filter out visited!=2 before the scripted_metric (and you can remove the check in the script). Note that the result would have an additional level (can be later moved using an ingest pipeline):

"visited_twice": {
  "visitdate_doc": { ... }
}

This solution should work, however I can not say if/how much performance you gain.

(FWIW our benchmark tool rally has support for transform.)

Hi @Hendrik_Muhs ,

we are creating few fields based on above sub-agg filter with range, where we want to create a fields based on 30 day and 45 day... avg value in transform index.

{
  "source": {
    "index": [
      "sales*"
    ]
  },
  "pivot": {
    "group_by": {
      "customer.keyword": {
        "terms": {
          "field": "customer.keyword"
        }
      },
      "locationcode.keyword": {
        "terms": {
          "field": "locationcode.keyword"
        }
      }
    },
    "aggregations": {
      "avg30days": {
        "filter": {
          "range": {
            "journeydate": {
              "gte": "now-30d/d",
              "lte": "now/d"
            }
          }
        },
        "aggs": {
          "avg_30val": {
            "avg": {
              "field": "linenetamount"
            }
          }
        }
      },
      "avg45days": {
        "filter": {
          "range": {
            "journeydate": {
              "gte": "now-45d/d",
              "lte": "now/d"
            }
          }
        },
        "aggs": {
          "avg_45val": {
            "avg": {
              "field": "linenetamount"
            }
          }
        }
      }
    }
  }
}

this is the error we are getting.

{
  "error": {
    "root_cause": [
      {
        "type": "status_exception",
        "reason": "Unsupported aggregation type [filter]"
      }
    ],
    "type": "status_exception",
    "reason": "Failed to validate configuration",
    "caused_by": {
      "type": "status_exception",
      "reason": "Unsupported aggregation type [filter]"
    }
  },
  "status": 400
}

please let us know how to rectify this error or any other logic to acheive the same.

Hey @InfiniteDreamer,

filter agg support was added in Elasticsearch 7.7. Can you confirm your version?