Transform time-based event logs

Hello,

I'd like to import raw logs from an application
where I have time-based events reporting the change of items' state.

I'd like to get a transformation on data where each point in time reports the latest known state for an item

for example I have this data set

POST test/_doc
    {
    "item": 1,
    "date" : "2020-03-10T14:12:12",
    "status" : "A"
    }

POST test/_doc
    {
    "item": 2,
    "date" : "2020-03-10T14:12:12",
    "status" : "A"
    }

POST test/_doc
    {
    "item": 3,
    "date" : "2020-03-10T14:12:12",
    "status" : "A"
    }



POST test/_doc
    {
    "item": 1,
    "date" : "2020-03-11T14:12:12",
    "status" : "B"
    }

POST test/_doc
    {
    "item": 2,
    "date" : "2020-03-11T14:12:12",
    "status" : "B"
    }




POST test/_doc
    {
    "item": 1,
    "date" : "2020-03-12T14:12:12",
    "status" : "C"
    }

POST test/_doc
    {
    "item": 2,
    "date" : "2020-03-12T14:12:12",
    "status" : "C"
    }  

and what I want to get is

{
 "item": 1,
 "date" : "2020-03-10T14:12:12",
 "status" : "A"
 }

 {
 "item": 2,
 "date" : "2020-03-10T14:12:12",
 "status" : "A"
 }

 {
 "item": 3,
 "date" : "2020-03-10T14:12:12",
 "status" : "A"
 }

 {
 "item": 1,
 "date" : "2020-03-11T14:12:12",
 "status" : "B"
 }

 {
 "item": 2,
 "date" : "2020-03-11T14:12:12",
 "status" : "B"
 }

 {
 "item": 3,
 "date" : "2020-03-11T14:12:12",
 "status" : "A"
 }

 {
 "item": 1,
 "date" : "2020-03-12T14:12:12",
 "status" : "C"
 }

 {
 "item": 2,
 "date" : "2020-03-12T14:12:12",
 "status" : "C"
 }

 {
 "item": 3,
 "date" : "2020-03-12T14:12:12",
 "status" : "A"
 } 

so for dates 2020-03-11 and 2020-03-12 the status of item 3 is taken from the latest known event, which happened on 2020-03-10

which is the best way to implement this?
Entity-centric index? Is there any simpler option?

Thanks

How many items do you have?

If the number is low and you do not plan to do further analysis a query - with top hits - might be good enough for your usecase.

hi, thanks for your answer.

The items don't have a fix cardinality, as they are daily imports corresponding to some files, I think they might be similar to web sessions (they have a start and end and change over time).

I tried top_hits, but on top of that I'd need to join the results with a list of dates

my goal is to put data on a graph (line) and see the latest known status by day

This query returns the latest known status for each item, but it doesn't have the status by date, how can I get that?

POST /test/_search?size=0
{
    "aggs": {
        "top_tags": {
            "terms": {
                "field": "item",
                "size": 3
            },
            "aggs": {
                "top_sales_hits": {
                    "top_hits": {
                        "sort": [
                            {
                                "date": {
                                    "order": "desc"
                                }
                            }
                        ],
                        "_source": {
                            "includes": [ "date", "item", "status" ]
                        },
                        "size" : 1
                    }
                }
            }
        }
    }
}

any idea? I'd really appreciate any guidance on this, top_hits doesn't seem enough for my case
thanks

We recently enhanced our documentation and added an example for getting the latest doc/state:

https://www.elastic.co/guide/en/elasticsearch/reference/7.7/transform-painless-examples.html#painless-top-hits

I hope that helps!

thanks a lot, Hendrik, for you help, do I need a xpack license for that?
Will that work only starting from the version 7.7 ?

Transform is basic licensed, basic can be used without costs.

scripted_metric in transform is supported since the beginning, which is 7.2, therefore this example should work in an older version. You might want to use a continuous transform to keep your data up to date, for that you need at least 7.3.

(7.7 is not released yet, however the examples on the page I send works with older versions)

I wonder: Was mentioning transform in the title of your post coincidence? Maybe I got you wrong.

Was mentioning transform in the title of your post coincidence?

No, I think I had a look at transform functions and somehow I got inspired.

so I'm trying this code against my data,

    GET test/_search
    {
        "aggs": {
            "test": {
                "scripted_metric": {
                    "init_script": "state.timestamp_latest = 0L; state.last_doc = ''", 
                    "map_script": """
          def current_date = doc['date'].getValue().toInstant().toEpochMilli();
          if (current_date > state.timestamp_latest)
          {state.timestamp_latest = current_date;
          state.last_doc = new HashMap(params['_source']);}
        """,
                    "combine_script": "return state", 
        "reduce_script": """ 
          def last_doc = '';
          def timestamp_latest = 0L;
          for (s in states) {if (s.timestamp_latest > (timestamp_latest))
          {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
          return last_doc
        """
                }
            }
        }
    }

it gives me an interesting result, but only on item 1

"aggregations" : {
    "test" : {
      "value" : {
        "date" : "2020-03-12T14:12:12",
        "item" : 1,
        "status" : "C"
      }
    }
  }

The idea is that you use it inside transform:

POST _transform/_preview
{
  "source": {
    "index": "test"
  },
  "pivot": {
    "group_by": {
      "item": {
        "terms": {
          "field": "item"
        }
      }
    },
    "aggregations": {
      "test": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
          "map_script": """
          def current_date = doc['date'].getValue().toInstant().toEpochMilli();
          if (current_date > state.timestamp_latest)
          {state.timestamp_latest = current_date;
          state.last_doc = new HashMap(params['_source']);}
        """,
          "combine_script": "return state",
          "reduce_script": """ 
          def last_doc = '';
          def timestamp_latest = 0L;
          for (s in states) {if (s.timestamp_latest > (timestamp_latest))
          {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
          return last_doc
        """
        }
      }
    }
  }
}

This is the preview endpoint, once you are happy with the result you can turn this into a transform by adding a destination and changing POST _transform/_preview to PUT _transform/my_transform

that's great, I'd try to see how transforms work.

Otherwise, without using a transform, I feel like what I miss is a type of aggregation on dates where each group contains all dates from the minimum up to the current day, so for example, for the following 3 dates

"2020-03-10T14:12:12"
"2020-03-11T14:12:12"
"2020-03-12T14:12:12"

I'd get the following 3 groups

"2020-03-10T14:12:12"

"2020-03-10T14:12:12"
"2020-03-11T14:12:12"

"2020-03-10T14:12:12"
"2020-03-11T14:12:12"
"2020-03-12T14:12:12"

in that case I'd add this grouping after the grouping by item, and I'd get what I'd like

Do you think is it easy to get this sort of grouping in ES?
Thanks a lot for your help

Buckets are always created based on data, you can not define buckets and check whether data is available for it. For this case it seems better to write an external application that sends queries according to your bucketing. Such logic could also interpolate missing values by e.g. remembering the last seen state and falling back to it in case search does not return a new value.