Figured it out myself, posting here for posterity.
This code will copy each document from the input
to the target index, adding a inventory_time
with the time the task ran at.
"actions": {
"index_payload": {
"transform": {
"script": {
"source": """
def documents = ctx.payload.hits.hits.stream()
.map(hit -> [
"_index": "daily-inventory",
"_id": hit._id,
"inventory_time": ctx['trigger']['scheduled_time'],
"source": hit._source.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
])
.collect(Collectors.toList());
return [ "_doc" : documents];
""",
"lang": "painless"
}
},
"index": {}
}
}