Transform single hit into multiple documents with Index action

I'd like to take the hits from the watch and insert each as a new document into a second index. But instead I'm getting a single document (with three timestamps).

Reading the docs I know this is supported but I can't figure out the magical incantation to make it so.

I'm on ES 6.3.2.

{
    "trigger": {
      "schedule": {
        "interval": "10s"
      }
    },
    "input": {
      "search": {
        "request": {
          "search_type": "query_then_fetch",
          "indices": [
            "the_source_index*"
          ],
          "types": [],
          "body": {
            "query": {
              "bool": {
                "must": [
                  {
                    "exists": {
                      "field": "elapsed_time"
                    }
                  },
                  {
                    "term": {
                      "transaction": "remove"
                    }
                  }
                ]
              }
            }
          }
        }
      }
    },
    "condition": {
      "compare": {
        "ctx.payload.hits.total": {
          "gt": 0
        }
      }
    },
    
    "actions": {
      "index_payload": {
        "transform" : { 
          "script" : "return [ '_doc' : ctx.payload.hits.hits];"
        },
        "index": {
          "index": "the_new_index",
          "doc_type": "doc"
        }
      }
    }
}

Thanks in advance for any help you can provide.

Hey,

I just ran this under 6.3.2

PUT foo/doc/_bulk
{ "index" : { "_id" : "1" } }
{ "foo" : "1" }
{ "index" : { "_id" : "2" } }
{ "foo" : "2" }
{ "index" : { "_id" : "3" } }
{ "foo" : "3" }


PUT _xpack/watcher/watch/_execute
{
  "watch" : {
    "trigger": {
      "schedule": {
        "interval": "10h"
      }
    },
    "input": {
      "search": {
        "request": {
          "indices": [
            "foo"
          ],
          "types": [],
          "body": {
            "query": {
              "match_all" : {}
            }
          }
        }
      }
    },
    "condition": {
      "compare": {
        "ctx.payload.hits.total": {
          "gt": 0
        }
      }
    },
    
    "actions": {
      "index_payload": {
        "transform" : { 
          "script" : "return [ '_doc' : ctx.payload.hits.hits];"
        },
        "index": {
          "index": "the_new_index",
          "doc_type": "doc"
        }
      }
    }
  }
}

this does not work, but returns the following error:

"could not execute action [index_payload] of watch [_inlined_]. [ctx.payload._index] or [ctx.payload._doc._index] were set together with action [index] field. Only set one of them"

The reason for this is, that you specified an index in the action, but each hit also contains an _index field - which is the name of the original index.

This means, you need to have another transform, that properly modifies the hits array structure before indexing.

Note, that watcher should not be used for bulk reindexing operations.

Something must be different, when you are getting back a single document. Can you include the output of the execute watch API please.

Hi Alexander,

Thank you for the response. My apologies for not including this output with the OP. I was using GET Watch API for troubleshooting, which doesn't provide the same level of detail as you know.

Thanks for the tip on the bulk re-indexing. Based on our data we are expecting < 5 hits per watch execution. I'm using a pretty broad query plus have it running at 10s only for my testing.

Here is the output you requested:

{
  "_id": "queue_test_891880ad-b9a0-42e2-a825-ad75ac0206fe-2018-08-20T09:51:34.770Z",
  "watch_record": {
    "watch_id": "queue_test",
    "node": "VBneV9GPRgOP8xBE3JU7RA",
    "state": "executed",
    "status": {
      "state": {
        "active": false,
        "timestamp": "2018-08-19T09:06:18.380Z"
      },
      "last_checked": "2018-08-20T09:51:34.770Z",
      "last_met_condition": "2018-08-20T09:51:34.770Z",
      "actions": {
        "index_payload": {
          "ack": {
            "timestamp": "2018-08-18T16:27:47.749Z",
            "state": "awaits_successful_execution"
          },
          "last_execution": {
            "timestamp": "2018-08-20T09:51:34.770Z",
            "successful": false,
            "reason": ""
          }
        }
      },
      "execution_state": "executed",
      "version": 2290
    },
    "trigger_event": {
      "type": "manual",
      "triggered_time": "2018-08-20T09:51:34.770Z",
      "manual": {
        "schedule": {
          "scheduled_time": "2018-08-20T09:51:34.770Z"
        }
      }
    },
    "input": {
      "search": {
        "request": {
          "search_type": "query_then_fetch",
          "indices": [
            "the_source_index*"
          ],
          "types": [],
          "body": {
            "query": {
              "bool": {
                "must": [
                  {
                    "exists": {
                      "field": "elapsed_time"
                    }
                  },
                  {
                    "term": {
                      "transaction": "remove"
                    }
                  }
                ]
              }
            }
          }
        }
      }
    },
    "condition": {
      "compare": {
        "ctx.payload.hits.total": {
          "gt": 0
        }
      }
    },
    "metadata": {
      "window_period": "1m",
      "threshold": 86400
    },
    "result": {
      "execution_time": "2018-08-20T09:51:34.770Z",
      "execution_duration": 7,
      "input": {
        "type": "search",
        "status": "success",
        "payload": {
          "_shards": {
            "total": 10,
            "failed": 0,
            "successful": 10,
            "skipped": 0
          },
          "hits": {
            "hits": [
              {
                "_index": "the_source_index_2018.08.18"
                "_type": "doc",
                "_source": {
                  "@timestamp": "2018-08-18T15:25:05.502Z",
                  "@version": "1",
                  "elapsed_time": 23.15,
                  "transaction": "remove"
                },
                "_id": "9tOjTWUBcfGrqNHNzk_i",
                "_score": 1.6931472
              },
              {
                "_index": "the_source_index_2018.08.18",
                "_type": "doc",
                "_source": {                  
                  "@timestamp": "2018-08-18T16:16:14.588Z",                 
                  "@version": "1",
                  "elapsed_time": 6.463,                  
                  "transaction": "remove"
                },
                "_id": "Z9PSTWUBcfGrqNHNo1Br",
                "_score": 1.287682
              },
              {
                "_index": "the_source_index_2018.08.17",
                "_type": "doc",
                "_source": {                 
                  "@timestamp": "2018-08-17T16:22:58.431Z",                  
                  "@version": "1",
                  "elapsed_time": 78.142,                  
                  "transaction": "remove"
                },
                "_id": "S9OySGUBcfGrqNHNcU8u",
                "_score": 1.287682
              }
            ],
            "total": 3,
            "max_score": 1.6931472
          },
          "took": 3,
          "timed_out": false
        },
        "search": {
          "request": {
            "search_type": "query_then_fetch",
            "indices": [
              "the_source_index*"
            ],
            "types": [],
            "body": {
              "query": {
                "bool": {
                  "must": [
                    {
                      "exists": {
                        "field": "elapsed_time"
                      }
                    },
                    {
                      "term": {
                        "transaction": "remove"
                      }
                    }
                  ]
                }
              }
            }
          }
        }
      },
      "condition": {
        "type": "compare",
        "status": "success",
        "met": true,
        "compare": {
          "resolved_values": {
            "ctx.payload.hits.total": 3
          }
        }
      },
      "actions": [
        {
          "id": "index_payload",
          "type": "index",
          "status": "failure",
          "error": {
            "root_cause": [
              {
                "type": "illegal_state_exception",
                "reason": "could not execute action [index_payload] of watch [water_works_queue_test]. [ctx.payload._index] or [ctx.payload._doc._index] were set together with action [index] field. Only set one of them"
              }
            ],
            "type": "illegal_state_exception",
            "reason": "could not execute action [index_payload] of watch [water_works_queue_test]. [ctx.payload._index] or [ctx.payload._doc._index] were set together with action [index] field. Only set one of them"
          }
        }
      ]
    },
    "messages": []
  }
}

In my troubleshooting I was trying so many things that I lost track so I should clarify my issue(s):

  • without the transform in the action the hits are are indexed as a single document - but this is the expected behavior
  • when I add the transform I'm seeing the same error you pointed out:
"type": "illegal_state_exception",
 "reason": "could not execute action [index_payload] of watch [queue_test]. [ctx.payload._index] or [ctx.payload._doc._index] were set together with action [index] field. Only set one of them"

So I'm guessing the fix is to transform the hits to remove the _index key of each hit?

Thanks again for the help and sorry for the confusion.

For folks running into the "could not execute action [index_payload] of watch [_inlined_]. [ctx.payload._index] or [ctx.payload._doc._index] were set together with action [index] field. Only set one of them" issue the fix is to simply remove the _index field of the docs you want to insert into the new index.

Thanks again for the help troubleshooting @spinscale!

the trick is to use a script transform in your action, which modifies the data and removes the _index for each hit. on top of my head without having tested it

def hits = ctx.payload.hits.hits.map(hit -> hit.remove('_index')).collect(Collectors.toList());
return ['_doc' : hits ]

for easier debugging you could add a logging action with the same transform. and run the execute watch API to see what the output looks like.

What is the number of hits that would be considered a bulk re-indexing?

See the docs about the size parameter. https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-request-from-size.html

Note: Even though the parameter is tunable, you are increasing memory requirements by increasing it.

1 Like

Thanks again @spinscale!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.