Next.checkpoint_progress.total_docs remains empty after first checkpoint

Hi,

We have defined a continuous transform job in Elastic Cloud, using a scripted group_by. We were previously bitten by this bug so we have upgraded Kibana to 7.9.2, and I recreated the transform job from scratch just to be sure.

Everything works fine until the first checkpoint. In the UI we can see the progress (number of document to process, progress percentage...). Then after the first checkpoint, all checkpointing "details" are empty:
image

Looking at the destination index, I can see that new documents are slowly coming, but I wonder if there is not a perf issue, because they seems to never "catching up". When creating the transform job, doing the first checkpoint (almost 3 years of history, 146M docs) took 2-3 days. Then I would expect the next checkpoint to be very quick, since it only has to process the documents added in the meantime during the last 2-3 days (1M docs).

Is there any perf issue when using a scripted group_by?

Here is the "stats" if that may help:

Another suspicious thing is that number of "document_processed" seems to increase more than the number of docs we have in the source index.

1 Like

Did you upgraded the whole stack? You need elasticsearch >= 7.9.2 as this was a backend issue.

Using scripts in group_by comes with a cost, a non-scripted group_by is definitely cheaper, however that does not mean you have a performance issue. This depends on other factors, e.g. your other group_by's if you have any.

Do the stats change? Do you see that it's "moving"?

This is normal, transform might need to re-process data, so it will visit source documents again. This highly depends on the transform job.

Can you share relevant parts of your config?

As cloud customer I suggest to make use of support, they can help gathering relevant information. Based on the information given, I can't give any advice, I need at least the configuration and the raw output of GET _transform/{id}/_stats.

Hi @Hendrik_Muhs

Yes, ES is also 7.9.2.

Yes, I see documents_processed increasing. But the second checkpoint has still not being completed.

The source index is made of daily "pings" coming from our "entities". The particularity is that the only field that we can consider as "id" for our entity is a timestamp (let's call it "creation_time"). Each entity is sending a ping every 24h.

So a source "ping" document look like:

{
  "creation_time": "2019-04-08T09:25:18.398+02:00",
  "timestamp": "2020-10-11T17:57:58.084+02:00", // date when the ping has been received
  ... // various other fields
}

The goal of my transform job is to create an index that will contain only one document per entity. In this document, I did some aggregations, to keep track of the oldest ping timestamp, latest ping timestamp, and also the latest ping content (other fields).

Since the "group_by" field is a date (but I want it to be used as a long term, not to do a date_histogram group_by), the only working solution I found was to use a scripted group_by:

{
  "id": "my-transform-job",
  "source": {
    "index": [
      "search-all-pings"
    ],
    "query": {
      "bool": {
        "should": [
          {
            "exists": {
              "field": "creation_time" // some old pings don't have this field
            }
          }
        ],
        "minimum_should_match": 1
      }
    }
  },
  "dest": {
    "index": "unique-entities"
  },
  "sync": {
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {
      "entity_id": {
        "terms": {
          "script": {
            "source": "return doc['creation_time']",
            "lang": "painless"
          }
        }
      }
    },
    "aggregations": {
      "first_ping_timestamp": {
        "min": {
          "field": "timestamp"
        }
      },
      "last_ping_timestamp": {
        "max": {
          "field": "timestamp"
        }
      },
      "latest_ping": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
          "map_script": "def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();\nif (current_date > state.timestamp_latest)\n        {state.timestamp_latest = current_date;\n        state.last_doc = new HashMap(params['_source']);}\n      ",
          "combine_script": "return state",
          "reduce_script": "def last_doc = '';\n        def timestamp_latest = 0L;\n        for (s in states) {if (s.timestamp_latest > (timestamp_latest))\n        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}\n        return last_doc\n      "
        }
      }
    }
  },
  "description": "Group entity pings by creation_time",
}

GET _transform/my-transform-job/_stats

{
  "count" : 1,
  "transforms" : [
    {
      "id" : "my-transform-job",
      "state" : "indexing",
      "node" : {
        "id" : "xxx",
        "name" : "instance-0000000003",
        "ephemeral_id" : "xxx",
        "transport_address" : "x.x.x.x:xxx",
        "attributes" : { }
      },
      "stats" : {
        "pages_processed" : 7077,
        "documents_processed" : 211286812,
        "documents_indexed" : 3536714,
        "trigger_count" : 4,
        "index_time_in_ms" : 1189531,
        "index_total" : 7074,
        "index_failures" : 0,
        "search_time_in_ms" : 512912760,
        "search_total" : 7078,
        "search_failures" : 0,
        "processing_time_in_ms" : 51229,
        "processing_total" : 7077,
        "exponential_avg_checkpoint_duration_ms" : 3.74750827E8,
        "exponential_avg_documents_indexed" : 2497214.0,
        "exponential_avg_documents_processed" : 1.29913829E8
      },
      "checkpointing" : {
        "last" : {
          "checkpoint" : 1,
          "timestamp_millis" : 1602057191949,
          "time_upper_bound_millis" : 1602057131949
        },
        "next" : {
          "checkpoint" : 2,
          "position" : {
            "indexer_position" : {
              "entity_id" : "2019-07-20T03:14:03.334Z"
            }
          },
          "checkpoint_progress" : {
            "docs_indexed" : 1039500,
            "docs_processed" : 81372983
          },
          "timestamp_millis" : 1602431948952,
          "time_upper_bound_millis" : 1602431888952
        },
        "operations_behind" : 1438822
      }
    }
  ]
}

Let me know if you need more details, and thanks for your help.

Your configuration is highly problematic. You have only 1 group_by and that uses a script, because of that transform can not optimize (there should be a warning in job messages). That means on every checkpoint transform will re-query all data again.

You can use terms on date fields, too. There is no need for the scripting, however the kibana UI does not provide it as suggestion, but you can use it using the advanced editor like you did for the script.

This will reduce the runtime for a checkpoint from a full re-run to only the changes. In detail that means transform first gets all entities that have changed between cp_current and cp_new and than runs the pivot exclusively for these changes. Nevertheless it has to re-access your source.

Conceptually it would be better if transform could just "update", but that's not how it works. Transform has to cover many use cases and it isn't optimized for this one. However we are looking into improvements based on feedback like this.

FYI I can't see any warning about that.

I tried to use a terms on the date field, but I think I was bitten by different issues, including this one:

Among my source documents, some have "crap" creation_time (we were not forcing the Gregorian Calendar).

GET search-all-pings/_search
{
  "query": {
    "range": {
      "creation_time": {
        "lte": 0
      }
    }
  }
}

returns documents like:

{
  "creation_time": "1396-11-28T09:08:02.565+03:30",
  "timestamp": "2018-02-17T06:42:59.709+01:00"
}

And then trying to run the transform job without the scripted group_by:

{
  "id": "my-transform-job",
  "source": {
    "index": [
      "search-all-pings"
    ],
    "query": {
      "bool": {
        "should": [
          {
            "exists": {
              "field": "creation_time" // some old pings don't have this field
            }
          }
        ],
        "minimum_should_match": 1
      }
    }
  },
  "dest": {
    "index": "unique-entities"
  },
  "sync": {
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {
      "entity_id": {
        "terms": {
          "field": "creation_time"
        }
      }
    },
    "aggregations": {
      "first_ping_timestamp": {
        "min": {
          "field": "timestamp"
        }
      },
      "last_ping_timestamp": {
        "max": {
          "field": "timestamp"
        }
      },
      "latest_ping": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
          "map_script": "def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();\nif (current_date > state.timestamp_latest)\n        {state.timestamp_latest = current_date;\n        state.last_doc = new HashMap(params['_source']);}\n      ",
          "combine_script": "return state",
          "reduce_script": "def last_doc = '';\n        def timestamp_latest = 0L;\n        for (s in states) {if (s.timestamp_latest > (timestamp_latest))\n        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}\n        return last_doc\n      "
        }
      }
    }
  },
  "description": "Group entity pings by creation_time",
}

fails with error like:

Failed to index documents into destination index due to permanent error: [BulkIndexingException[Bulk index experienced [8] failures and at least 1 irrecoverable [TransformException[Destination index mappings are incompatible with the transform configuration.]; nested: MapperParsingException[failed to parse field [creation_time] of type [date] in document with id '_0HgFgacZakzTfqDfQGszpQAAAAAAAAA'. Preview of field's value: '-18107428792863']; nested: IllegalArgumentException[failed to parse date field [-18107428792863] with format [strict_date_optional_time||epoch_millis]];

I tried different strategies to exclude those documents from the transform, but so far without any success.

Can you check the mappings of the destination index you created with the scripted group_by vs. the one you created without?

I think the difference will be that for the one with script it won't be mapped as date but e.g. long, that's why you do not get a mapping conflict.

You can create the transform destination index yourself:

  • create the transform, but don't start it
  • create the destination index (you can use the suggested mappings from _preview)
  • start the transform

The mapping is date in both cases.

I tried to "force" mapping to "long" by creating the destination index manually, but then I get a different error (possibly still due to some bad data):

task encountered irrecoverable failure: Failed to execute phase [query], Partial shards failure; shardFailures {[61SMzV9PS5yHxMSjDcuhRw][pings-2017][0]: RemoteTransportException[[instance-0000000000][x.x.x.x.:xxxx][indices:data/read/search[phase/query]]]; nested: IllegalArgumentException[invalid value, expected string, got Long]; }; [invalid value, expected string, got Long]; nested: IllegalArgumentException[invalid value, expected string, got Long];; java.lang.IllegalArgumentException: invalid value, expected string, got Long

Thanks for getting through this.

I was able to reproduce it, the difference between using the field and using the script seems to be the way transform handles the key:

For the script it looks like this:

"entity_id" : "1396-11-28T05:38:02.565Z"

while taking the field

"entity_id" : -18084968517435

I think terms on a date should use the string value, I will create an issue for that.

I tested the suggested manual mapping (for the field based group_by) and it worked at least with the demo data. The failure you posted is about a query problem, but the mapping only affects indexing. This might be a different problem. I have a suspicion regarding this, might be an issue in composite aggregations (used internally by transform) according to the error message. I will investigate this further and come back with more info.

In my case this won't be a concern, but what if two documents have the same date formatted differently (like different offsets):

2020-11-28T05:38:02.565Z
2020-11-28T09:08:02.565+03:30

are they going to be "grouped" if the string are different, but in fact represent the same instant?

Thanks a lot!

I think I was able to reproduce the problem. I created the following issue: https://github.com/elastic/elasticsearch/issues/63787. To get notified once a fix has been developed you can subscribe the issue.

Meanwhile I suggest to mitigate the problem by re-indexing the historic data:

  • create an index pipeline that takes creation_time and creates a new field from that, mapped as keyword
  • use the new field for your terms group_by

The same ingest pipeline can be attached to your new incoming data, I would attach the pipeline, rollover the incoming ingest to a new index and than re-index all the existing historic indexes.

You might want to use a slightly enhanced pipeline for the historic data to fix your other data problems in one go.

Thanks for the issue, I will track it. I have started to reindex all my documents, but I choose to convert the date to a long (I think it will be more robust to a possible date format change).

1 Like

Hi @Hendrik_Muhs

So now I have reindexed all my documents with a pipeline to add a new entity_id field that is "simply" the creation_time as long.

I tried to drop/recreate the transform job, but after only a few documents processed, it fails with

task encountered irrecoverable failure: Failed to execute phase [query], Partial shards failure; shardFailures {[Fya0HXoDRC2RaRcxXEYPUg][my-source-index-2017][0]: RemoteTransportException[[instance-0000000003][10.11.132.241:19708][indices:data/read/search[phase/query]]]; nested: IllegalArgumentException[invalid value, expected string, got Long]; }; [invalid value, expected string, got Long]; nested: IllegalArgumentException[invalid value, expected string, got Long];; java.lang.IllegalArgumentException: invalid value, expected string, got Long

The error is not very helpful.

What is really strange is that in the my-source-index-2017 index, there should be no documents selected, because the field creation_time (and consequently entity_id) is missing.

GET /my-source-index-2017/_search
{
  "query": {
    "bool": {
        "should": [
          {
            "exists": {
              "field": "entity_id"
            }
          }
        ],
        "minimum_should_match": 1
    }
  }
}


{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  }
}

How can I find the source of the error?

Can you post the full output of GET _transform/{transform_id}/_stats? It sounds like a mapping problem to me. It would be helpful if you can post the mappings for my-source-index-2017 and the transform configuration you are using now.

GET _transform/last-ping-by-entity-id/_stats

{
  "count" : 1,
  "transforms" : [
    {
      "id" : "last-ping-by-entity-id",
      "state" : "failed",
      "reason" : "task encountered irrecoverable failure: Failed to execute phase [query], Partial shards failure; shardFailures {[61SMzV9PS5yHxMSjDcuhRw][my-source-index-2017][0]: RemoteTransportException[[instance-0000000000][172.17.0.5:19484][indices:data/read/search[phase/query]]]; nested: IllegalArgumentException[invalid value, expected string, got Long]; }; [invalid value, expected string, got Long]; nested: IllegalArgumentException[invalid value, expected string, got Long];; java.lang.IllegalArgumentException: invalid value, expected string, got Long",
      "node" : {
        "id" : "Fya0HXoDRC2RaRcxXEYPUg",
        "name" : "instance-0000000003",
        "ephemeral_id" : "bnHdBBsFQu2DftjjLtGqxg",
        "transport_address" : "10.11.132.241:19708",
        "attributes" : { }
      },
      "stats" : {
        "pages_processed" : 1,
        "documents_processed" : 24562,
        "documents_indexed" : 500,
        "trigger_count" : 1,
        "index_time_in_ms" : 99,
        "index_total" : 1,
        "index_failures" : 0,
        "search_time_in_ms" : 337,
        "search_total" : 1,
        "search_failures" : 1,
        "processing_time_in_ms" : 4,
        "processing_total" : 1,
        "exponential_avg_checkpoint_duration_ms" : 0.0,
        "exponential_avg_documents_indexed" : 0.0,
        "exponential_avg_documents_processed" : 0.0
      },
      "checkpointing" : {
        "last" : {
          "checkpoint" : 0
        },
        "next" : {
          "checkpoint" : 1,
          "position" : {
            "indexer_position" : {
              "entity_id" : 1293846176892
            }
          },
          "checkpoint_progress" : {
            "docs_remaining" : 132094793,
            "total_docs" : 132119355,
            "percent_complete" : 0.018590765902543195,
            "docs_indexed" : 500,
            "docs_processed" : 24562
          },
          "timestamp_millis" : 1603111944169,
          "time_upper_bound_millis" : 1603111884169
        },
        "operations_behind" : 157699154
      }
    }
  ]
}

Job was created like that:

PUT _transform/last-ping-by-entity-id
{
  "source": {
    "index": [
      "search-all-entity-pings"
    ],
    "query": {
      "exists": {
        "field": "entity_id"
      }
    }
  },
  "dest": {
    "index": "pings-unique-entities"
  },
  "sync": {
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {
      "entity_id": {
        "terms": {
          "field": "entity_id"
        }
      }
    },
    "aggregations": {
      "first_ping_timestamp": {
        "min": {
          "field": "timestamp"
        }
      },
      "last_ping_timestamp": {
        "max": {
          "field": "timestamp"
        }
      }
    }
  },
  "description": "Group pings by entity_id"
}

Here is the mapping of the my-source-index-2017:

GET /my-source-index-2017

{
  "my-source-index-2017" : {
    "aliases" : {
      "search-all-entity-pings" : { }
    },
    "mappings" : {
      "properties" : {
        "xxx_mode_used" : {
          "type" : "boolean"
        },
        "days_of_use" : {
          "type" : "long"
        },
        "days_since_installation" : {
          "type" : "long"
        },
        "host_version" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "entity_product" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "entity_version" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "timestamp" : {
          "type" : "date"
        },
        "type" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1589892473078",
        "number_of_shards" : "1",
        "number_of_replicas" : "1",
        "uuid" : "-RzxudpDTMO0x3xS4vtDWw",
        "version" : {
          "created" : "7070099"
        },
        "provided_name" : "my-source-index-2017"
      }
    }
  }
}

As you can see, in this old index, no documents were having the creation_time field, so no entity_id either.
Maybe there is an issue if one index in the source pattern/alias has no matches?

I tried to reproduce this issue, but I wasn't able to reproduce the exact same error. It seems to me, that your indexes have conflicting mappings somehow. This is not a transform issue in particular, but search related.

If you want to further investigate, you can try to run queries against search-all-entity-pings and see if you can reproduce it. Next step I would remove indexes one by one until it stops breaking.

What query should I try?

I'm using this alias already (including in visualizations) and I have not issues.

I just tried to run the transform job with source index being my-source-index-2017 instead of the alias search-all-entity-pings and it worked without error and very fast (since there are no documents with entity_id, this is expected).
I can try to do the same with all indexes that are part of the alias, but I would like to understand why the error message say the issue is in my-source-index-2017?

I think I found the issue. Because the field entity_id was missing in all documents of the index my-source-index-2017 it had no mappings in this index.
I forced the mapping to be long in this index, and it seems to have fixed the issue:

PUT /my-source-index-2017/_mapping
{
  "properties": {
    "entity_id": {
      "type": "long"
    }
  }
}

So to sum up the issue:

  • have an index alias that "contains" 2 (or more) indexes
  • one index has no documents with the field that will be used in the transform group_by
  • this index has also no mappings for this field
  • the other indexes have the field + mapping of type long
  • try to run a transform job having the index alias as source + the long field as group_by fails with:
IllegalArgumentException[invalid value, expected string, got Long]