Can transform with init, map, combine, reduce emit multiple documents?

Hi,

In an index we have multiple documents for sequential steps that are part of a single transaction.
We need to calculate the elapsed times for each step.

Instead of emitting 1 document with all timings for all steps,

      {
        ....
        ...
        "_source" : {
          "transaction_id" : "0000017cf17ef9c5-116feb",
          "end_time" : "2021-11-08T11:30:52.248Z",
	      "start_time" : "2021-11-08T11:30:52.174Z",
          "total_elapsed_ms" : 51,
          "processing_time" : {
            "step-1" : {
              "transaction_id" : "0000017cf17ef9c5-116feb",
              "cum_elapsed_ms" : 15,
              "elapsed_ms" : 15,
              "handling_time" : "2021-11-08T11:30:52.189Z"
            },
            "step-0" : {
              "transaction_id" : "0000017cf17ef9c5-116feb",
              "cum_elapsed_ms" : 0,
              "elapsed_ms" : 0,
              "handling_time" : "2021-11-08T11:30:52.174Z"
            },
            "step-2" : {
              "transaction_id" : "0000017cf17ef9c5-116feb",
              "cum_elapsed_ms" : 51,
              "elapsed_ms" : 36,
              "handling_time" : "2021-11-08T11:30:52.225Z"
            }
          }
        }
      }

I need to have a document for each step
Is there a way to achieve this?

Regards Hans

Below are the pivot and aggregations parts

"pivot": {
    "group_by": {
        "transaction_id": {
            "terms": {
                "field": "transactionId"
            }
        }
    },
    "aggregations": {
        "verwerkingstijden": {
            "scripted_metric": {
                "init_script": "state.docs = []",
                "map_script": 
				"""
                			Map span = [
                                'handling_time':doc['handlingTime'].value,
                                'transaction_id':doc['transactionId'].value,
                				'messageId':doc['messageId'].value
                				];
                			state.docs.add(span)
                """,
                "combine_script": "return state.docs",
                "reduce_script": 
				"""
                       def ret = new HashMap();
                			 def all_docs = [];
                			 for (s in states) {
                                  for (span in s) {
                                     all_docs.add(span);
                                  }
                       }
                      all_docs.sort((HashMap o1, HashMap o2)-> o1['handlingTime'].millis.compareTo(o2['handlingTime'].millis));
                      def no_docs = all_docs.size();
                      def total_elapsed_ms = all_docs[no_docs-1]['handlingTime'].millis - all_docs[0]['handlingTime'].millis;
                      all_docs[0]['elapsed_ms'] = 0;
                      all_docs[0]['cum_elapsed_ms'] = 0;
                      ret['step-' + 0] = all_docs[0];
                      for (int i=1; i<no_docs; i++) {
                        all_docs[i]['elapsed_ms']= all_docs[i]['handlingTime'].millis - all_docs[i-1]['handlingTime'].millis;
                			  all_docs[i]['cum_elapsed_ms']= all_docs[i]['handlingTime'].millis - all_docs[0]['handlingTime'].millis;
                			  ret['step-' + i] = all_docs[i];
                      }
                	  ret['total_elapsed_ms'] = total_elapsed_ms;
                  	  ret['start_time'] = all_docs[0]['handlingTime'];
                	  ret['end_time'] = all_docs[no_docs-1]['handlingTime'];
                	  return ret;
            """
            }
        }
    }
}

Unfortunately this is not possible at the moment and would be complex to implement.

Neither an ingest pipeline nor transform supports this. Apart from that, it is quite tricky, transform needs deterministic document id's in order to update the index.

You could use several transforms and destinations. That's not ideal regarding performance of course.

1 Like

That is very unfortunate
I was already looking if an ingest pipeline could do this.
Multiple transforms and destinations will not work since the documents must be grouped by in order to be able to calculate the relative elapsed times.
The documents need to end in the same index and not different ones.

Danger Zone

You can have several transforms writing into the same destination, these transforms would do the very same calculation but only emit the values you need, so you get separate docs. The problem: the transforms overwrite docs from each other due to the way the document id is generated.

But you can workaround this, if you rewrite _id yourself in an ingest pipeline using the fingerprint processor. As fields use the ones from the group_by. This way updates will work as the fingerprint will be the same for the same group_by values. To create different hashes for the individual transforms use a different salt for every transform.

Hello Hendrik
I appreciate your help very much.
However I do not understand how multiple transforms with the fingerprint processor would help in this case.
Regards Hans

There seems to be a demand for such a feature.

I understood that you want separate docs for every processing step. What you can do is to create a transform for every step, assuming you know the steps and they are not hundreds but e.g. 3 steps. For this case you could create transform-step-1, transform-step-2, transform-step-3. They are almost identical, but filter for the certain step you are looking for.

The problem: As the group_by is identical for transform-step-1, transform-step-2 and transform-step-3, they would overwrite each other. The fingerprint processor can workaround it.

Does that make it more clear?
Would that work for you?

No it is not clear.

We have documents that contain transaction information
They have their own messageid but the same transaction id.
Each transaction can have 2 or more steps and start with : Frontend request and end with Frontend response.
In between there are 1 or more combinations of Backend requests and Backend responses. (They should match)
If there is backend requests there is also a response.
In this case we have 4 documents,
Now I want to calculate the elapsed times between each document and the total response time.

If 4 documents come in 4 should come out with the elapsed time.
Could also be 6 or 8 or 10.

BTW the response always has a field corresponding_messageid that links every response with the request it belongs too
But request do not have that.

Regards Hans

I understand in your example you want 4 output docs: elapsed_1_2, elapsed_2_3, elapsed_3_4, total_elapsed. Your script calculates all 4 values.

My workaround suggestion was to create 4 transforms that all calculate the 4 values, but in the reduce script return only 1 of the 4 values. That's ugly, I know, but that's the only way I see to implement your requirement with what's available today. (Except you can't name the name the 4 steps, this won't work of course).

The problem is that there can be 6 or 8 or 10 steps.
That would require the same amount of transforms

I see. Yes, it would require the amount of transforms equal to the steps, which is hard coded and would need to be maintained.

I am afraid, their is no better way. I hope the ingest split gets implemented eventually.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.