Hi. We have developed a transform that works given the same input indices in the console. However, when using the golang TransformPutTransform API, many of the documents are missing in the resulting index.
Here is the code in the console that works:
PUT _transform/phils_test_01
{
"source": {
"index": ["axp_marketplace_search_products_1719409395_unique", "axp_marketplace_search_catalog_1_1720730404_unique"]
},
"dest": {
"index": "phils_test_01"
},
"pivot": {
"group_by": {
"product_pk": {
"terms": {
"field": "product_pk"
}
}
},
"aggregations": {
"doc_count": {
"value_count": {
"field": "product_pk"
}
},
"products": {
"scripted_metric": {
"init_script": "state.docs = []",
"map_script": "state.docs.add(new HashMap(params['_source']))",
"combine_script": "return state.docs",
"reduce_script": """
def hm = new HashMap();
def si = states.iterator();
while (si.hasNext()) {
def s = si.next();
def di = s.iterator();
while (di.hasNext()) {
def d = di.next();
def tagsList = new ArrayList();
def exceptTag = "Plus_except_1";
def plusTag = "Plus";
def mPlusTag;
def mPlusExceptTag;
for (v in d.entrySet()) {
if ("catalog_date_created".equals(v.getKey()) || "last_image_upload_date_utc".equals(v.getKey())) {
def value = (String)v.getValue();
value = value.substring(0, (int)Math.min(value.length(), 19));
value = value.replace(' ', 'T');
if (!value.endsWith("Z") && value.length() == 19) {
value += "Z";
}
if (value.length() == 0) {
// Hack to get parsing to work. Note, now does not work in painless.
value = "2019-07-17T12:13:14Z";
}
hm.put(v.getKey(), value);
} else {
hm.put(v.getKey(), v.getValue());
}
if ("tags".equals(v.getKey())) {
tagsList = v.getValue();
mPlusTag = null;
for (tag in tagsList) {
def tagLoc = tag.get("name_loc");
if (tagLoc == null) {
continue;
}
def enUs = tagLoc.get("en-US");
if (enUs.equalsIgnoreCase(plusTag)) {
mPlusTag = true;
}
}
if (mPlusTag != null) {
mPlusExceptTag = null;
for (tag in tagsList) {
def tagLoc = tag.get("name_loc");
if (tagLoc == null) {
continue;
}
def enUs = tagLoc.get("en-US");
if (enUs.equalsIgnoreCase(exceptTag)) {
mPlusExceptTag = true;
}
}
}
hm.put("is_marketplace_plus", (mPlusTag != null && mPlusExceptTag == null));
}
}
di.remove();
}
si.remove();
}
return hm;
"""
}
},
"doc_count_filter": {
"bucket_selector": {
"buckets_path": {
"count": "doc_count"
},
"script": "params.count == 2"
}
}
}
},
"frequency": "10m",
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
}
}
Here is the request body used in golang that returns missing documents:
transformConfig.Request = KeyValue{
"source": KeyValue{
"index": []string{
productSourceIndexName,
GetUniqueTransformName(sourceIndexName),
},
},
"dest": KeyValue{
"index": transformConfig.Name,
},
"pivot": KeyValue{
"group_by": KeyValue{
"product_pk": KeyValue{
"terms": KeyValue{
"field": "product_pk",
},
},
},
"aggregations": KeyValue{
"doc_count": KeyValue{
"value_count": KeyValue{
"field": "product_pk",
},
},
"products": KeyValue{
"scripted_metric": KeyValue{
"init_script": "state.docs = []",
"map_script": "state.docs.add(new HashMap(params['_source']))",
"combine_script": "return state.docs",
"reduce_script": `
def hm = new HashMap();
def si = states.iterator();
while (si.hasNext()) {
def s = si.next();
def di = s.iterator();
while (di.hasNext()) {
def d = di.next();
def tagsList = new ArrayList();
def exceptTag = "Plus_except_` + strconv.FormatInt(programPk, 10) + `";
def plusTag = "Plus";
def mPlusTag;
def mPlusExceptTag;
for (v in d.entrySet()) {
if ("catalog_date_created".equals(v.getKey()) || "last_image_upload_date_utc".equals(v.getKey())) {
def value = (String)v.getValue();
value = value.substring(0, (int)Math.min(value.length(), 19));
value = value.replace(' ', 'T');
if (!value.endsWith("Z") && value.length() == 19) {
value += "Z";
}
if (value.length() == 0) {
// Hack to get parsing to work. Note, now does not work in painless.
value = "2019-07-17T12:13:14Z";
}
hm.put(v.getKey(), value);
} else {
hm.put(v.getKey(), v.getValue());
}
if ("tags".equals(v.getKey())) {
tagsList = v.getValue();
mPlusTag = null;
for (tag in tagsList) {
def tagLoc = tag.get("name_loc");
if (tagLoc == null) {
continue;
}
def enUs = tagLoc.get("en-US");
if (enUs.equalsIgnoreCase(plusTag)) {
mPlusTag = true;
}
}
if (mPlusTag != null) {
mPlusExceptTag = null;
for (tag in tagsList) {
def tagLoc = tag.get("name_loc");
if (tagLoc == null) {
continue;
}
def enUs = tagLoc.get("en-US");
if (enUs.equalsIgnoreCase(exceptTag)) {
mPlusExceptTag = true;
}
}
}
hm.put("is_marketplace_plus", (mPlusTag != null && mPlusExceptTag == null));
}
}
di.remove();
}
si.remove();
}
return hm;
`,
},
},
"doc_count_filter": KeyValue{
"bucket_selector": KeyValue{
"buckets_path": KeyValue{
"count": "doc_count",
},
"script": "params.count == 2",
},
},
},
},
"frequency": "10m",
"sync": KeyValue{
"time": KeyValue{
"field": "@timestamp",
"delay": "60s", // default
},
},
}
For example, the console approach works for a resulting 22,332 documents with one program. Using golang, the resulting document has a different document count each time. For example, it once had a document count of 5,000. We are using TransformPutTransform golang API.
Does anyone have any idea why the golang approach would result in missing documents? It should be noted that in the transform page there are no error messages from either transform (console or golang).