Transformation 'pivot' has value null

Hello :wave:
I have a question regarding transforms.
There is source index which has dedicated transformation with grouping by 'pivot'.
The pivot can be one field or concatenation of multiple fields.
For ex:
'fullName': 'FirstName|LastName' where value FirstName comes from field firstName from source index and value LastName comes from field lastName from source index.
If lastName field gets value null in source index, how will be the grouping by pivot in the transformation? How the existing document will be updated?
The transformation has frequency: "1m" and sync based on ingest_timestamp with delay of 60s

Thank you

Hello :wave:

Hi!

The pivot can be one field or concatenation of multiple fields

How do you achieve concatenation? Do you use a script or runtime field?

how will be the grouping by pivot in the transformation

Do you have two separate group-by clauses for first and last names?
Or just one for fullName?

The transformation has frequency: "1m" and sync based on ingest_timestamp with delay of 60s

Could you provide full transform config and a few rows from the source index?

Hi @przemekwitek

  1. How do you achieve concatenation? Do you use a script or runtime field? -> Concatenation with pipeline processor
  2. Do you have two separate group-by clauses for first and last names?
    Or just one for fullName ? -> Separate firstName/lastName fields
  3. Could you provide full transform config and a few rows from the source index?
    source-index:
"hits": [
      {
        "_index": "person-data-index",
        "_id": "x-V2HI0BU2N54XdmKHXE",
        "_score": 1,
        "_source": {
          "firstName": "FirstName1",
          "lastName": "LastName1",
          "zipCode": null,
          "personId": "personId",
          "ingest_timestamp": "2024-01-22T12:29:45.480638977Z",
          "source": "source-1"
        }
      },
      {
        "_index": "person-data-index",
        "_id": "jfl2HI0BWquxb6JedpIc",
        "_score": 1,
        "_source": {
          "firstName": "FirstName2",
          "lastName": "LastName2",
          "zipCode": "12345",
          "personId": "personId-1",
          "ingest_timestamp": "2024-01-18T12:05:42.043752715Z",
          "source": "source-1"
        }
      },
      {
        "_index": "person-data-index",
        "_id": "yOV2HI0BU2N54Xdmj3UR",
        "_score": 1,
        "_source": {
          "firstName": "FirstName2",
          "lastName": "LastName2",
          "zipCode": "12345",
          "personId": "personId-1",
          "ingest_timestamp": "2024-01-18T12:05:48.433166779Z",
          "source": "source-2"
        }
      },
      {
        "_index": "person-data-index",
        "_id": "jPl2HI0BWquxb6JeUJKg",
        "_score": 1,
        "_source": {
          "firstName": "FirstName1",
          "lastName": "LastName1",
          "zipCode": null,
          "personId": "personId",
          "ingest_timestamp": "2024-01-22T12:30:45.237586836Z",
          "source": "source-2"
        }
      }
    ]

transform:

{
  "id": "dest-transformation-personal-data-001",
  "authorization": {
    "roles": [
      "superuser"
    ]
  },
  "version": "8.7.0",
  "create_time": 1705925962383,
  "source": {
    "index": [
      "transformed-personal-data-index"
    ],
    "query": {
      "bool": {
        "must": [
          {
            "exists": {
              "field": "combined.firstName"
            }
          },
          {
            "exists": {
              "field": "combined.lastName"
            }
          },
          {
            "exists": {
              "field": "combined.zipCode"
            }
          }
        ]
      }
    }
  },
  "dest": {
    "index": "dest-personal-data-index",
    "pipeline": "ingest_timestamp_pipeline"
  },
  "frequency": "1m",
  "sync": {
    "time": {
      "field": "ingest_timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {
      "combined.concatenatedPersonalData": {
        "terms": {
          "field": "combined.concatenatedPersonalData"
        }
      }
    },
    "aggregations": {
      "values": {
        "scripted_metric": {
          "init_script": "\n                state.doc = [];\n            ",
          "map_script": "\n                state.doc.add(params['_source']['combined']);\n            ",
          "combine_script": "return state",
          "reduce_script": "\n                def result = [];\n                for (state in states) {\n                    for (s in state.doc) {\n                        result.add(s);\n                    }\n                }\n                return result;\n            "
        }
      }
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

result index:

"hits": [
      {
        "_index": "dest-personal-data-index",
        "_id": "RoIoqTPJe2AGm__QmnP8A2UAAAAAAAAA",
        "_score": 1,
        "_source": {
          "ingest_timestamp": "2024-01-22T12:19:23.322437191Z",
          "values": [
            {
              "firstName": "FirstName1",
              "lastName": "LastName1",
              "zipCode": "123456",
              "personId": "personId",
              "ingest_timestamp": "2024-01-18T12:05:32.448356717Z",
              "concatenatedPersonalData": "FirstName1|LastName1|123456",
              "source": "source-2"
            }
          ],
          "combined": {
            "concatenatedPersonalData": "FirstName1|LastName1|123456"
          }
        }
      },
      {
        "_index": "dest-personal-data-index",
        "_id": "RpRt_CQd6HrBNOdLZGKGfR4AAAAAAAAA",
        "_score": 1,
        "_source": {
          "ingest_timestamp": "2024-01-22T12:19:23.322574528Z",
          "values": [
            {
              "firstName": "FirstName2",
              "lastName": "LastName2",
              "zipCode": "12345",
              "personId": "personId-1",
              "ingest_timestamp": "2024-01-18T12:05:48.433166779Z",
              "concatenatedPersonalData": "FirstName2|LastName2|12345",
              "source": "source-2"
            }
          ],
          "combined": {
            "concatenatedPersonalData": "FirstName2|LastName2|12345"
          }
        }
      }
    ]

So, it seems that you have some ingest pipeline (other than ingest_timestamp_pipeline which is used for documents ingested into destination index) that sets combined.concatenatedPersonalData field in the source index (transformed-personal-data-index). Is that correct?
Although, it's strange as I cannot see this combined.concatenatedPersonalData field in the source index (transformed-personal-data-index) data you provided...

Basically how you concatenate the individual pieces (first name, last name, zip code) into a single field is up to the ingest processor, not the transform itself.
You need to make sure the ingest processor handles null values correctly. I could imagine it producing strings like: FirstName1|<NULL>|123456 or FirstName1|LastName1|<NULL>.
Then, the transform would simply apply terms aggregation over those strings.

Hi @przemekwitek,
Sorry, I've missed the other part.
This is the pipeline for concatenation:

"ingest_timestamp_and_personal_data_concat": {
    "description": "Adds timestamp, combined personal data",
    "processors": [
      {
        "set": {
          "field": "ingest_timestamp",
          "value": "{{_ingest.timestamp}}"
        }
      },
      {
        "set": {
          "field": "combined.concatenatedPersonalData",
          "value": "{{{combined.firstName}}}|{{{combined.lastName}}}|{{{combined.zipCode}}}"
        }
      }
    ]
  }

This is the middle transform along with transformed index. This is like a middle transformation which is needed based on the needs

middle_transformation:

{
    "id": "transform-personal-data-001",
    "authorization": {
      "roles": [
        "superuser"
      ]
    },
    "version": "8.7.0",
    "create_time": 1705581545221,
    "source": {
      "index": [
        "person-data-index"
      ],
      "query": {
        "match_all": {}
      }
    },
    "dest": {
      "index": "transformed-personal-data-index",
      "pipeline": "ingest_timestamp_and_personal_data_concat"
    },
    "frequency": "1m",
    "sync": {
      "time": {
        "field": "ingest_timestamp",
        "delay": "60s"
      }
    },
    "pivot": {
      "group_by": {
        "personId": {
          "terms": {
            "field": "personId"
          }
        }
      },
      "aggregations": {
        "combined": {
          "scripted_metric": {
            "init_script": "\n                state.zipCode = null;\n                state.person = [:];\n            ",
            "map_script": "\n                state.person = new HashMap(params['_source']);\n            ",
            "combine_script": "return state",
            "reduce_script": "\n                def result = [:];\n                def person = [:];\n                for (s in states) {\n                    if (s.person.size() > 0) {\n                        person.putAll(s.person);\n                    }\n                }\n                person.put('personId', person.get('personId'));\n                result.putAll(person);\n                return result\n            "
          }
        }
      }
    },
    "settings": {
      "max_page_search_size": 500
    }
  }

middle transformed index:

{
    "took": 9,
    "timed_out": false,
    "_shards": {
      "total": 2,
      "successful": 2,
      "skipped": 0,
      "failed": 0
    },
    "hits": {
      "total": {
        "value": 2,
        "relation": "eq"
      },
      "max_score": 1,
      "hits": [
        {
          "_index": "transformed-personal-data-index",
          "_id": "cF4m-w5GzFwJMLwThTxSx0YAAAAAAAAA",
          "_score": 1,
          "_source": {
            "personId": "personId",
            "ingest_timestamp": "2024-01-22T12:32:11.769394748Z",
            "combined": {
              "firstName": "FirstName1",
              "lastName": "LastName1",
              "zipCode": null,
              "personId": "personId",
              "ingest_timestamp": "2024-01-22T12:30:45.237586836Z",
              "concatenatedPersonalData": "FirstName1|LastName1|",
              "source": "source-2"
            }
          }
        },
        {
          "_index": "transformed-personal-data-index",
          "_id": "cOJk6APobfVthkGDQMzDnjQAAAAAAAAA",
          "_score": 1,
          "_source": {
            "personId": "personId-1",
            "ingest_timestamp": "2024-01-18T12:39:05.766290217Z",
            "combined": {
              "firstName": "FirstName2",
              "lastName": "LastName2",
              "zipCode": "12345",
              "personId": "personId-1",
              "ingest_timestamp": "2024-01-18T12:05:48.433166779Z",
              "concatenatedPersonalData": "FirstName2|LastName2|12345",
              "source": "source-2"
            }
          }
        }
      ]
    }
  }

And from this is used the transform and destination index that I've send previously.

If you compare the result index and middle transformed index, the data is not updated with null value for zipCode
docId: cF4m-w5GzFwJMLwThTxSx0YAAAAAAAAA
docId: RoIoqTPJe2AGm__QmnP8A2UAAAAAAAAA

So, do I understand correctly that you'd like to update an old document changing its zipCode
and then see the change in the result index?
There may be some problem with such a scenario.
There is an existing bug report regarding similar case.
I'll try to reproduce and let you know if I find anything.

Exactly. I would expect that to be reflected in result index
Thanks a lot, that would be amazing