Transform results in index with missing documents when using the API but works for console

Hi. We have developed a transform that works given the same input indices in the console. However, when using the golang TransformPutTransform API, many of the documents are missing in the resulting index.

Here is the code in the console that works:

PUT _transform/phils_test_01
{
  "source": {
    "index": ["axp_marketplace_search_products_1719409395_unique", "axp_marketplace_search_catalog_1_1720730404_unique"]
  },
  "dest": {
    "index": "phils_test_01"
  },
 "pivot": {
    "group_by": {
			"product_pk": {
				"terms": {
					"field": "product_pk"
				}
			}
		},
		"aggregations": {
			"doc_count": {
				"value_count": {
					"field": "product_pk"
				}
			},
			"products": {
				"scripted_metric": {
					"init_script":    "state.docs = []",
					"map_script":     "state.docs.add(new HashMap(params['_source']))",
					"combine_script": "return state.docs",
					"reduce_script": """
						  def hm = new HashMap();
							def si = states.iterator();
							while (si.hasNext()) {
							  def s = si.next();
							  def di = s.iterator();
							  while (di.hasNext()) {
							    def d = di.next();
									def tagsList = new ArrayList();
									def exceptTag = "Plus_except_1";
									def plusTag = "Plus";
									def mPlusTag;
									def mPlusExceptTag;
								  for (v in d.entrySet()) {
								    if ("catalog_date_created".equals(v.getKey()) || "last_image_upload_date_utc".equals(v.getKey())) {
									    def value = (String)v.getValue();
									    value = value.substring(0, (int)Math.min(value.length(), 19));
									    value = value.replace(' ', 'T');
									    if (!value.endsWith("Z") && value.length() == 19) {
									      value += "Z";
									    }
									    if (value.length() == 0) {
									      // Hack to get parsing to work.  Note, now does not work in painless.
									      value = "2019-07-17T12:13:14Z";
									    }
									    hm.put(v.getKey(), value);
								    } else {
									    hm.put(v.getKey(), v.getValue());
								    }
								    if ("tags".equals(v.getKey())) {
										  tagsList = v.getValue();
										  mPlusTag = null;
										  for (tag in tagsList) {
											  def tagLoc = tag.get("name_loc");
											  if (tagLoc == null) {
												    continue;
											  }
											  def enUs = tagLoc.get("en-US");
											  if (enUs.equalsIgnoreCase(plusTag)) {
											    mPlusTag = true;
											  }
										  }
									    if (mPlusTag != null) {
									      mPlusExceptTag = null;
												for (tag in tagsList) {
												  def tagLoc = tag.get("name_loc");
												  if (tagLoc == null) {
												  	continue;
													}
													def enUs = tagLoc.get("en-US");
													if (enUs.equalsIgnoreCase(exceptTag)) {
														mPlusExceptTag = true;
													}
												}
										  }
										hm.put("is_marketplace_plus", (mPlusTag != null && mPlusExceptTag == null));
										}
									}
									di.remove();
							  }
							  si.remove();
							}
							return hm;
              """
				}
			},
			"doc_count_filter": {
				"bucket_selector": {
					"buckets_path": {
						"count": "doc_count"
					},
					"script": "params.count == 2"
				}
			}
		}
	},
	"frequency": "10m",
	"sync": {
		"time": {
			"field": "@timestamp",
			"delay": "60s"
		}
	}
}

Here is the request body used in golang that returns missing documents:

		transformConfig.Request = KeyValue{
			"source": KeyValue{
				"index": []string{
					productSourceIndexName,
					GetUniqueTransformName(sourceIndexName),
				},
			},
			"dest": KeyValue{
				"index": transformConfig.Name,
			},
			"pivot": KeyValue{
				"group_by": KeyValue{
					"product_pk": KeyValue{
						"terms": KeyValue{
							"field": "product_pk",
						},
					},
				},
				"aggregations": KeyValue{
					"doc_count": KeyValue{
						"value_count": KeyValue{
							"field": "product_pk",
						},
					},
					"products": KeyValue{
						"scripted_metric": KeyValue{
							"init_script":    "state.docs = []",
							"map_script":     "state.docs.add(new HashMap(params['_source']))",
							"combine_script": "return state.docs",
							"reduce_script": `
							def hm = new HashMap();
							def si = states.iterator();
							while (si.hasNext()) {
							  def s = si.next();
							  def di = s.iterator();
							  while (di.hasNext()) {
							    def d = di.next();
								def tagsList = new ArrayList();
								def exceptTag = "Plus_except_` + strconv.FormatInt(programPk, 10) + `";
								def plusTag = "Plus";
								def mPlusTag;
								def mPlusExceptTag;
								for (v in d.entrySet()) {
									if ("catalog_date_created".equals(v.getKey()) || "last_image_upload_date_utc".equals(v.getKey())) {
									    def value = (String)v.getValue();
									    value = value.substring(0, (int)Math.min(value.length(), 19));
									    value = value.replace(' ', 'T');
									    if (!value.endsWith("Z") && value.length() == 19) {
									      value += "Z";
									    }
									    if (value.length() == 0) {
									      // Hack to get parsing to work.  Note, now does not work in painless.
									      value = "2019-07-17T12:13:14Z";
									    }
									    hm.put(v.getKey(), value);
								    } else {
									    hm.put(v.getKey(), v.getValue());
								    }
							    	if ("tags".equals(v.getKey())) {
									  	tagsList = v.getValue();
									 	mPlusTag = null;
										for (tag in tagsList) {
											def tagLoc = tag.get("name_loc");
											if (tagLoc == null) {
												continue;
											}
											def enUs = tagLoc.get("en-US");
											if (enUs.equalsIgnoreCase(plusTag)) {
											  mPlusTag = true;
											}
										}
									    if (mPlusTag != null) {
									      	mPlusExceptTag = null;
											for (tag in tagsList) {
											  	def tagLoc = tag.get("name_loc");
												if (tagLoc == null) {
												 	continue;
												}
												def enUs = tagLoc.get("en-US");
												if (enUs.equalsIgnoreCase(exceptTag)) {
													mPlusExceptTag = true;
												}
											}
										  }
										hm.put("is_marketplace_plus", (mPlusTag != null && mPlusExceptTag == null));
										}
									}
									di.remove();
							  }
							  si.remove();
							}
							return hm;
						  `,
						},
					},
					"doc_count_filter": KeyValue{
						"bucket_selector": KeyValue{
							"buckets_path": KeyValue{
								"count": "doc_count",
							},
							"script": "params.count == 2",
						},
					},
				},
			},
			"frequency": "10m",
			"sync": KeyValue{
				"time": KeyValue{
					"field": "@timestamp",
					"delay": "60s", // default
				},
			},
		}

For example, the console approach works for a resulting 22,332 documents with one program. Using golang, the resulting document has a different document count each time. For example, it once had a document count of 5,000. We are using TransformPutTransform golang API.

Does anyone have any idea why the golang approach would result in missing documents? It should be noted that in the transform page there are no error messages from either transform (console or golang).

Everything looks the same except for the source indices. Can you double check that they resolve to the same values?

Can you check the JSON tab in the UI, or use the GET Transform API to see if there are differences in the resulting transform configuration?

I've looked at the JSON, and they are the same source indices.

The only difference I see in the JSON tab is the authorization. This is probably because the api uses a different user. Could it be priveleges/roles?

It could be a difference in roles, yes. The expected behavior is that the Transform will eventually fail if it was given insufficient permissions, though the user will need additional permissions to execute painless scripts as well (I think, unsure).

  • The transform remembers which roles the user that created it had at the time of creation and uses those same roles. If those roles do not have the required privileges on the source and destination indices, the transform fails when it attempts unauthorized operations. If you provide secondary authorization headers, those credentials are used instead.

If the JSON configs are otherwise the same, it doesn't really matter how they got there, the same Transform config should produce the same results. I'd look into the permission thing first, then if that doesn't work then I'd look to see if there are any warns/errors in the logs.

You can increase verbosity of transform logs if need be:

PUT /_cluster/settings
{
   "transient": {
      "logger.org.elasticsearch.xpack.transform.transforms": "debug"
   }
}

Thanks. I just realized that one of the input indices is also generated in code by a transform. It may be that the input index is not complete by the time the second transform (the one in the code in the original post) is started. Is there any way to get a notification when a started transform is completed in golang? Failing that can we wait for the started transform to finish somehow?

There isn't that kind of notification or eventing (that I know of).

Is the architecture something like one transform writing to an index, then a second transform reading from that index and pushing to another index?

If it is a non-continuous transform (i.e. it does not have sync and frequency in the config), the only way is to poll the stats API and check when the transform moves into STOPPED. If it is a continuous transform, the stats API has a "last checkpoint" timestamp that will get updated when the transform is finished the current iteration. The golang service/process would basically have to busy-wait though while polling for updates.

Alternatively, you could add an ingest pipeline to the destination index of the first tranform, and that ingest pipeline adds the current timestamp when the doc is added to the index. The second transform can use that time field to identify changes and then begin its work.

Is the architecture something like one transform writing to an index, then a second transform reading from that index and pushing to another index?

Yes that's it, and the first and second transforms are both continuous.

The second transform can use that time field to identify changes and then begin its work.

Are there any examples on how to do this?

None that I could find, it should be something like:

  1. add a pipeline that sets an event.ingested field, similar to How to add time of ingestion to the document?
  2. in the first transform, add the pipeline to the destination: "dest": { "index": "phils_test_01", "pipeline": "pipeline_add_ingest_timestamp" },
  3. in the second transform, add the event.ingested field as the sync field: "sync": { "time": { "field": "event.ingested", "delay": "60s" } }

Ok. I've been able to try your solution, but it results in no documents in the destination index. When I look at the transform's preview tab, there are many entries, so I am puzzled by this. Could it be that the source index was not created before the transform started?

I noticed that the event.ingested date appears in every document of the source index (for the second transform). Is that how it should be?

Even when I run through the console, I now get no documents in the final index, so I don't think its a race condition.

When I change the final transform from event.ingested to @timestamp, it works in the console, so I don't think I messed up the implementation of your solution. However, I could be wrong.

Ok. I fixed it. Our final transform takes two indices and combines them into one. I thought if I put the event.ingest pipeline into one of the source indices it would work. Turns out I needed it in both. Now, it seems to work. I will do some more testing, but thanks Patrick as your solution worked.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.