Is there a way to implement a keep last N docs per id in a specific index?

Hello all,

I have a weird question, I would like to know if there's some way to implement a grouping filter on a index. E.g.

Say for example that we have an index where each doc is one customer purchase, and we keep all purchases from an online store in this index.

Is there a way (trying to think how it would be possible with dataframe transforms or processors) to create a new index that keeps only the last 50 purchases of each customer?

Thanks in advance

We have a similar situation and we solved it via a list.
It works in our data flow scenario so it might not apply to your case. We don't mind large document payload as it's more efficient from our experiment.
What we ended up doing in your example would be putting each customer into individual document.
There's a field called "purchase_history" which is a list.
Each time we have a new purchase from a customer, we get the record and save it to the end of "purchase_history" list. Check the size of the list and trim head if the list is > 50.

Something like that. Of course we have caching and sharding by customerID so the same thread will handle the same customer, etc.
Basically, it's the application doing what you are looking for rather than ES's build-in function.

A list (without nesting) works the same way in terms of searching as if they are individual documents. The difference is you will get a hit on customer record rather than individual purchases.
Say your search results in 3 purchase records all belong to the same customer. My above organization will yield 1 result. But your existing organization would yield 3.

First, there are no "weird questions". It is an interesting usecase.

This can indeed be done with transforms. For this case you create a transform group_by customer id and collapse the purchases as list. I described an example in an advent calendar post, unfortunately german language. Let me take it from there, you can start with a scripted metric like this:

"all_purchases": {
  "scripted_metric": {
    "init_script": "state.docs = []",
    "map_script": "state.docs.add(new HashMap(params['_source']))",
    "combine_script": "return state.docs",
    "reduce_script": "def docs = []; for (s in states) {for (d in s) { docs.add(d);}}return docs"
  }
}

This would create a list of all purchases:

"all_purchases" : [
  {
    "order_id" : 42,
    "date" : "2019-12-01T10:00:00Z",
...
  },
  {
    "order_id" : 99,
    "date" : "2019-12-02T12:00:00Z",
...
  },

To get the top-n 2 possibilities come to my mind: "at runtime" or "as post processing".

Post processing: Sort the list afterwards by order date and cut it at n. You can do this as part of the reduce script or you write the output of the transform into a pipeline and use a script processor.

At runtime: Instead of a list use a sorted map, a TreeMap and you map order_date to the order object. For insert you could only add to the tree map if it either not reached n or the first key - which is the "lowest" key - is lower than the key you are looking at. Afterwards you trim it to n. Memory-wise this is the most efficient way as this will not keep all orders in memory. Memory consumption might not be a problem for an e-commerce usecase but for IOT it could.

I hope I gave you an idea and would be happy if you share your result.

Thank you both, very much for your replies.

Hendrik, I think this was exactly what I was looking for. This is what I ended up implementing based on your example, in our use case we were ok with post processing, so I implemented the sorting (descending) and cutting the first N orders:

      "last_500_orders": {
  		"scripted_metric": {
  			"init_script" : "state.docs = []",
  			"map_script" : "state.docs.add(new HashMap(params['_source']))",
  			"combine_script" : "return state.docs",
  			"reduce_script" : "def docs = []; for (s in states) {for (d in s) { docs.add(d);}} docs.sort((d1, d2) -> d2.order_date.compareTo(d1.order_date)); return docs.subList(0, (int)Math.min(docs.size(), N));"
  		}
  }

I'll monitor this performance-wise and report back my observations.

Thanks again for the help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.