Elasticsearch Transform not triggering

Hello, hope you can help me with a transform issue. I already created a very simple transform to test, set the frequency to 5s, and then used the _transform/mytransform/_start. The stats said the transform is started and the destination index has the initial data correctly. However, I added new documents to the source index but they never get to the destination index, the stats for my transform also show operations_behind: 1, and the number just increases when I add new documents to the source index.
My cluster has 1 node and it has all the roles and permissions needed, but I think is the free version 7.11.1, is there a limitation of the free version? or I'm missing something in the transformation in order to trigger I was expecting the destination index to be updated every 5 seconds.
Thanks

There are no limitations dependent on the license. Transform can be used as part of the (free) basic license.

  • Can you post your configuration?
  • Did you specify sync and a corresponding timestamp field?
  • Did your pushed document contain a timestamp field with a value not in the past by more than delay?
1 Like

Hello, thanks for the reply. You are right I didn't specify the sync so it was not behaving continuously, I thought just setting the frequency was enough.
My documents do not have a timestamp field yet, but I guess I can automatically add a timestamp field using a pipeline, and setting it as the default pipeline for my indexes, that should work, right?
Overall I'm merging the objects from two different indexes, the objects have one common field which I'm using in the pivot.group_by
Also, I think the timestamp field must be named the same in both sources, so any new document or update in either source index will trigger the transformation. Right?

Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [cedula] in order to load field data by uninverting the inverted index. Note that this can use significant memory.

In order to avoid above error must indicate cedula.keyword instead of just cedula in the group_by.

I still don't know if the script I have in the scripted_metric is the best for my use case or might be an easier way?

Here is a sample of my objects

PUT /phone/_doc/1
{
"cedula": "123",
"celnumber": "88631765",
"brand": "Pixel 4a",
"ingest_time" : "2021-04-22T04:38:57.389Z"
}
PUT /person/_doc/jose
{
"cedula": "123",
"name": "Jose",
"lastname": "Longhi",
"ingest_time" : "2021-04-22T04:38:57.389Z"
}

Then the transformation looks like this

 PUT _transform/test_transformation
{
    "description": "test transformation",
    "frequency": "10s",
    "sync": {
        "time": {
            "field": "ingest_time"
        }
    },
    "dest": {
        "index": "masterperson"
    },
    "source": {
        "index": [
            "person",
            "phone"
        ]
    },
    "pivot": {
        "group_by": {
            "cedula": {
                "terms": {
                    "field": "cedula.keyword"
                }
            }
        },
        "aggregations": {
            "contact": {
                "scripted_metric": {
                    "init_script": "state.doc = ''",
                    "map_script": "if(doc._index.value == 'person' ){state.doc = new HashMap(params['_source']) }",
                    "combine_script": "return state.doc",
                    "reduce_script": "return states[0]"
                }
            },
            "phones": {
                "scripted_metric": {
                    "init_script": "state.docs = []",
                    "map_script": "if(doc._index.value == 'phone' ){state.docs.add(new HashMap(params['_source']))}",
                    "combine_script": "return state.docs",
                    "reduce_script": "def docs = []; for (s in states) {for (d in s) { docs.add(d);}}return docs"
                }
            }
        }
    }
}

The final document in the destination index look like this

{
                "_index": "masterperson",
                "_type": "_doc",
                "_id": "MS6cp-nYNYWh0Vhq528Kx_QAAAAAAAAA",
                "_score": 1,
                "_source": {
                    "cedula": "123",
                    "contact": {
                        "name": "Jose",
                        "ingest_time": "2021-04-22T04:38:57.389Z",
                        "cedula": "123",
                        "lastname": "Longhi"
                    },
                    "phones": [
                        {
                            "ingest_time": "2021-04-22T04:38:57.389Z",
                            "telefono": "88631765",
                            "brand": "Pixel 4a",
                            "cedula": "123"
                        }
                    ]
                }
            }

Regarding the ingest timestamp, I recommend this.

Regarding your scripted metric: Will you have duplicates in the data? E.g. what if a person updates the contact information? I wonder if you additionally want the latest value. Now you return the 1st. Which might not even be the 1st in terms of time or index order, but the 1st that 1 shard answers.

Good point, as it is right now there will be no duplicates since the person source index just keeps the latest value for a given cedula. However, I will take that into account if our inbound process changes, and then we will need to pick the latest one if multiple are present.

Thank you very much! I'm kind of new to Elasticsearch and I'm trying to learn while I build.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.