Use watcher index action with multiple documents

Figured it out myself, posting here for posterity.

This code will copy each document from the input to the target index, adding a inventory_time with the time the task ran at.

"actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": """
          def documents = ctx.payload.hits.hits.stream()
            .map(hit -> [
              "_index": "daily-inventory", 
              "_id": hit._id, 
              "inventory_time": ctx['trigger']['scheduled_time'],
              "source": hit._source.entrySet().stream()
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
            ])
            .collect(Collectors.toList());
          return [ "_doc" : documents]; 
          """,
          "lang": "painless"
        }
      },
      "index": {}
    }
  }
1 Like