Forming a new document with all the fields everytime a new event comes in

Hello, So I am currently trying to figure this out and would love to know if my current approach is correct and if not, is there a way to do it. So, the question is
We have these events coming in with each event having minimum 3 states. Now, the first one is a complete event but the rest are just the necessary information needed. The task is when these event comes in, form the complete event from its past state and add it to a new index. for e.g.

State 1:
{
        state : "create",
        startTime: "2020-12-13T21:34:16.978Z",
        lastUpdateTime : "2020-12-13T21:34:16.978Z",
        id : "01",
        name : "xyz",
        data : ["abc","c"]
}

State 2:
{
       state : "Inprogress",
       lastUpdateTime : "2020-12-13T21:34:16.978Z",
       id : "01"
}

State 3:
{
       state : "Completed",
       lastUpdateTime : "2020-12-13T21:34:16.978Z",
       id : "01"
}

As you can see not all states are coming with all the fields so when we create this docx in the new index we have to not only add updated fields but also the field that were not present in that particular state.
Some of the things that I have already looked into are :
Collapse : Does not work as it just gives the latest document.
Tranform(Latest) : Does not work as it just replaces with the latest document.

Would love to know if pivot transform would work or is there any other way to do this.

I think what you describe is a case for pivot. I would group_by id and e.g. use max(lastUpdateTime). Regarding filling in sparse data: I think you need a scripted_metric to implement this. You could look for the fields and keep whatever every value that is not null.

It might get complicated if its possible to get conflicts e.g. 2 different names, in this case you need a tiebreaker, e.g. take the latest value.

Thank you for the reply. So, I started working with pivot and used group_by on id. After that I could not use max(lastUpdateTime) outside as there can be fields in the previous version of the docx that I will need so instead I did terms aggs and for every term I am ordering it based on its lastUpdateTime. But the problem is that now it is coming with that timestamp in the field and also it feels a bit complicated. Would love to know how would I be able to do a scripted_metric on this. Here is a snippet of what I wrote :

POST _transform/_preview
{
  "source": {
    "index": "test3",
    "query": {
    "match_all": {}
    }
  },
  "dest" : {
    "index" : "test4"
  },
  "pivot": {
      "group_by": {
        "id": { "terms": { "field": "id" }}
    },
    "aggs": {
      "state": 
        {
          "terms" : 
          { 
            "field": "state",
            "size": 1,
            "order": 
              {
                "last": "desc"
              }
          },
      "aggs":{
        "last":{
          "max": {
            "field": "lastUpdateTime"
          }
        }
      }
      },
      "name":{
        "terms":
        {
          "field": "name",
          "size": 1
        }}
    }
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "60s"
    }
  }
}

@Hendrik_Muhs Also, if possible can you share an example of scripted metric that would fit this scenario. Thanks again.

@karanp I formatted your post, hope that's ok (its the other triple quotes ` instead of ' ).

Regarding your usecase, a scripted metric that does a last state is part of our examples.

Regarding your join requirement, without guarantee that this works:

    "aggregations": {
      "properties": {
        "scripted_metric": {
          "init_script": "state.join = new HashMap()",
          "map_script": "String[] fields = new String[] {'startTime', 'name', 'data'}; for (e in fields) { if (doc.containsKey(e)) {state.join.put(e, doc[e])}}",
          "combine_script": "return state.join",
          "reduce_script": "String[] fields = new String[] {'startTime', 'name', 'data'}; Map j=new HashMap(); for (s in states) {for (e in fields) { if (s.containsKey(e)) {j.put(e, s[e].get(0))}}} return j;"
        }
      }
    }

We know using scripted_metric/painless is complicated, but that's the only way today. In future we may improve the ootb experience by providing better functions. Nevertheless, once you figured out the required scripts, you should be fine.

@Hendrik_Muhs Thank you for replying and also formatting my post. Over the last few days I went over the transforms that you suggested and I was able to get the most updated field and merge it to create a docx. But unfortunately this does not completely fir my case as at the end of this it updates a docx in the new index and for my case I need to create a new docx everytime this runs. Let me share with you the script that I wrote.

"init_script": "state.time = 0L; state.last_val = ''",
 "map_script": "def curr_date = doc[params.time].getValue().toInstant().toEpochMilli();
        if (doc[params.field].size()!=0 && curr_date>state.time)
        {
          state.time = curr_date;
          state.last_val =  params['_source'][params.field];
        }",
          "combine_script": "return state",
          "reduce_script": "def last_value = '';
            def newTime = 0L;
            for (s in states){
              if (s.time > newTime){
                newTime = s.time;
                last_value = s.last_val;
              }
            }  
            return last_value"

So, after I run this the document gets created in the new index. But once a new docx with update comes in it just updates in there which is a problem for me. I am looking into reindex or ingest pipeline for now but not sure if that will be helpful. Please let me know if you think the any of that would work. Thanks.

Updating documents is how transform works, that's by design.

However, you can let transform write into an ingest pipeline. In the ingest pipeline you can change _id so that a new document gets written every time. E.g. you could take the id created by transform and append lastUpdateTime to it. As lastUpdateTime should be unique per id you will get a new doc for every event.

LBNL the ingest pipeline allows you to re-arrange the fields / change schema of the document. If you do this, you should create the index for the transform yourself before you start the transform, because otherwise transform will auto-create the destination index with the mappings from pivot alone.

@Hendrik_Muhs Thank you for the suggestion. Transforms with ingest pipeline might just work out for me.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.