Transform for predefined fields

Hi Team elastic,

We have a use case, where we created a continuous pivot transform on a source index to get count of status of transactions. If status of any of the transaction changes, the same document appears in destination index with a different status and count 1. So now we have two documents, each with status count as 1. However on further changes on the same document gives correct count. So we have two questions -

  1. We wanted to know about the delete behaviour of transform API. Is it able to identify the modified documents and aggregate on that to show correct results?
  2. Is there a way that we can pre define the value of status with count as zero and then transform can calculate the count correctly?

Here's the transform

PUT _transform/tradebystatusbucket
{
  "source": {
    "index": [
      "latest-transactions"
    ]
  },
  "pivot": {
    "group_by": {
      "submissionAccountName": {
        "terms": {
          "field": "submissionAccountName.keyword"
        }
      },
      "executingEntityIdCode": {
        "terms": {
          "field": "executingEntityIdCode.keyword"
        }
      },
      "regulator": {
        "terms": {
          "field": "regulator.keyword"
        }
      },
      "assetClass": {
        "terms": {
          "field": "assetClass.keyword"
        }
      },
      "status": {
        "terms": {
          "field": "status.keyword"
        }
      }
    },
     "aggs": {
        "keywords": {
          "value_count": {
            "field": "status.keyword"
          }
        }
      }
  },
  "frequency": "10s",
  "dest": {
    "index": "tradebystatusarmbucket"
  },
  "sync": {
    "time": {
      "field": "ingest_time",
      "delay": "1s"
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

Thanks for the questions, to answer it I like to take a step back:

If I understand correctly you want 1 document containing only the the latest status?

If that's correct you shouldn't group by status, but get the status as part of aggregations, you can use a top_metrics aggregations for this, similar to this example.

To still get counts by status, you can use a terms aggregation on status.keyword or you group them via filter's as in this example.

Does that answer your questions and/or help you implementing the use case? If not, please let me know. A data example showing what's the input and what the desired output are helpful.

Thank you Hendrik for your reply. May be I didn't explain the problem very nicely.
For first part of my question-
Our source index has this document

{
        "_index": "latest-transactions",
        "_id": "Nl9kpQ9yF1TXHBQc-qCYdNl0AAAAAAAA",
        "_score": 1,
        "_source": {
          "ingest_time": "2022-07-08T18:57:09.153851949Z",
          "warningDescriptions": "Reported - non-MiFID eligibleeee",
          "submissionAccountName": "ACCOUNT1",
          "warnings": "W6009",
          "tradeRef": "ref1",
          "assetClass": "EQUI",
          "initial_ingest_time": "2022-07-08T15:41:15.251916742Z",
          "newfield2": "new field",
          "regulator": "BaFIN",
          "instructionId": "ins2",
          "payload_ts": 1657191138648,
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "ABC"
        }
      }

so my transformed index given above gives me this document in destination index showing that there's one document with status ABC.

{
        "_index": "tradebystatusarm",
        "_id": "RTZCQUEymvCVMlUdIi-_ZNvQR9cCAAAA",
        "_score": 1,
        "_source": {
          "submissionAccountName": "ACCOUNT1",
          "regulator": "BaFIN",
          "statuscount": {
            "count": 1
          },
          "assetClass": "EQUI",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "ABC"
        }
      }

Now, if status on the document in source index changes to say CDE-

{
        "_index": "latest-transactions",
        "_id": "Nl9kpQ9yF1TXHBQc-qCYdNl0AAAAAAAA",
        "_score": 1,
        "_source": {
          "ingest_time": "2022-07-11T07:44:51.526700855Z",
          "warningDescriptions": "Reported - non-MiFID eligibleeee",
          "submissionAccountName": "ACCOUNT1",
          "warnings": "W6009",
          "tradeRef": "ref1",
          "assetClass": "EQUI",
          "initial_ingest_time": "2022-07-08T15:41:15.251916742Z",
          "newfield2": "new field",
          "regulator": "BaFIN",
          "instructionId": "ins2",
          "payload_ts": 1657191138648,
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "CDE"
        }
      }

I see 2 documents in my destination index - showing, there's document with status ABC and 1 document with status CDE

{
        "_index": "tradebystatusarm",
        "_id": "RTZCQUEymvCVMlUdIi-_ZNvQR9cCAAAA",
        "_score": 1,
        "_source": {
          "submissionAccountName": "ACCOUNT1",
          "regulator": "BaFIN",
          "statuscount": {
            "count": 1
          },
          "assetClass": "EQUI",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "ABC"
        }
      },
      {
        "_index": "tradebystatusarm",
        "_id": "RTZCQ0EIAVrLnmSvbuYwsiWh068uAAAA",
        "_score": 1,
        "_source": {
          "submissionAccountName": "ACCOUNT1",
          "regulator": "BaFIN",
          "statuscount": {
            "count": 1
          },
          "assetClass": "EQUI",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "CDE"
        }
      }

What we want is, the transform to delete/ignore the previous document as it's status has changed.

 {
        "_index": "tradebystatusarm",
        "_id": "RTZCQ0EIAVrLnmSvbuYwsiWh068uAAAA",
        "_score": 1,
        "_source": {
          "submissionAccountName": "ACCOUNT1",
          "regulator": "BaFIN",
          "statuscount": {
            "count": 1
          },
          "assetClass": "EQUI",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "CDE"
        }

Are we using the transform aggregation correctly?

Thanks,

as I said in my answer, you should not group by status, because you don't want to separate the docs per status. So status must go from the group_by into the aggregation section. To get the latest status you can use top_metrics:

"status": {
        "top_metrics": { 
          "metrics": { "field": "status" },
          "sort": { "ingest_time": "desc" }
        }
      }

Thanks Hendrik. Seems to be the right solution. :slightly_smiling_face:

hi Hendrik,
There's a bit of a snag though.

for these two documents in source

{
        "_index": "latest-transactions",
        "_id": "NnLULrwn4Me522lTlVzUwIbJAAAAAAAA",
        "_score": 1,
        "_source": {
          "initial_ingest_time": "2022-07-11T10:03:00.611536236Z",
          "ingest_time": "2022-07-11T10:03:00.611536236Z",
          "transactionReferenceNumber": "ref1",
          "warningDescriptions": "Reported - non-MiFID eligibleeee",
          "submissionAccountName": "ACCOUNT1",
          "warnings": "W6009",
          "regulator": "BaFIN",
          "instructionId": "ins1",
          "assetClass": "EQUI",
          "payload_ts": "1657530982219",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "SMETHING"
        }
      },
      {
        "_index": "latest-transactions",
        "_id": "NnJtzRgray9JKJWDQ3aQCIE7AAAAAAAA",
        "_score": 1,
        "_source": {
          "initial_ingest_time": "2022-07-11T10:03:06.605101590Z",
          "ingest_time": "2022-07-11T10:03:06.605101590Z",
          "transactionReferenceNumber": "ref3",
          "warningDescriptions": "Reported - non-MiFID eligibleeee",
          "submissionAccountName": "ACCOUNT1",
          "warnings": "W6009",
          "regulator": "BaFIN",
          "instructionId": "ins3",
          "assetClass": "EQUI",
          "payload_ts": "1657530982219",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61",
          "status": "SMETHING"
        }
      },

what if I want count of these two grouped by unique fields? using the solution that you mentioned gives me this document in target index which is not what we want.

      {
        "_index": "tradebystatusarm",
        "_id": "RTZCQUD6rtJHryURX1AtStIkqKUAAAAA",
        "_score": 1,
        "_source": {
          "status_name": {
            "status.keyword": "SMETHING"
          },
          "submissionAccountName": "ACCOUNT1",
          "regulator": "BaFIN",
          "assetClass": "EQUI",
          "executingEntityIdCode": "635400BDQCJNMOGTBB61"
        }
      }

what we want is to see count as 2 instead of 1

what do you mean with "count as 2"?

Is this related to this agg from the 1st post:

To count the number of docs, you can use one of your other group_by fields, e.g. regulator.keyword.

Having the status keyword in aggregation gives count correctly, but , it doesn't tell us the value of that status keyword.

This is what I have in place and doesn't seem to be working.

"aggregations": {
       "status_count" : { 
         "top_metrics": { 
          "metrics": { "field": "status.keyword" },
          "sort": { "ingest_time": "desc" }
          }
        },
        "status_keyword": {
          "value_count": {
            "field": "status.keyword"
          }
        }
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.