Using painless in watcher

I want to use painless in watcher to add or change data of some fields.
But when I added this code in this watcher,
"transform": {
"script": {
"source": "ctx._source.period = ctx._source.duration/1000",
"lang": "painless"
}
}
I got this error.

"root_cause": [
{
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"ctx._source.period = ctx._source.duration/1000",
" ^---- HERE"
],
"script": "ctx._source.period = ctx._source.duration/1000",
"lang": "painless"
}
],

Could you give me some advice for this?

the field ctx._source does not exist within the watch execution context. If you want to know what fields are available you could use a logging action and write out the whole ctx variable via mustache. You are most likely looking for ctx.payload which contains the input data.

1 Like

You have to check your JSON response :slight_smile:
Which looks like this for an http input :

Capture
Most of the time , to get the source of the document hits you have to do this :
ctx.payload.hits.hits.0._source.period
This syntax will access to the document in position 0 ;
But you need to set "size": "<1 or superior>" in your query

Whatever you do in this script , it will not change the data of your documents stored in elasticsearch.
You have to index the result with an index action : Index action documentation

Thank you for your good advice.
Here is my code below.
Actually, I want to change value of field using watcher.

{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"aaa"
],
"types": [],
"body": {
"query": {

      }
    }
  }
}

},
"condition": {
"always": {}
},
"actions": {
"index_payload": {
"transform": {
"script": {
"source": "ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
"lang": "painless"
}
},
"index": {
"index": "bbb",
"doc_type": "bbb_type"
}
}
}
}

but It is giving me this kind of error below. Thank you in advance!!

"error": {
"root_cause": [
{
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
" ^---- HERE"
],
"script": "ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
"lang": "painless"
}
],
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
" ^---- HERE"
],
"script": "ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
"lang": "painless",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "Illegal list shortcut value [_source]."
}
}
},
"reason": "Failed to transform payload"
}

I think , it would be easier for you to use scripted field in index pattern=> Scripted field and then do this :

return doc['duration'].value/1000;

Thank you for good advice. But I have no choice but to use watcher. Because I need to do this periodically.

I've changed that code like this below.

{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"aaa"
],
"types": [],
"body": {
"query": {
"bool": {
"must": [
{
"range": {
"new_time": {
"gte": "now-1d"
}
}
},
{
"range": {
"new_time": {
"lt": "now"
}
}
}
]
}
}
}
}
}
},
"condition": {
"always": {}
},
"actions": {
"index_payload": {
"transform": {
"script": {
"source": "for(int j=0;j<ctx.payload.hits.total;j++){ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}",
"lang": "painless"
}
},
"index": {
"index": "bbb",
"doc_type": "bbb"
}
}
}
}

and It' s giving me this sort of error below.

 "type": "script_exception",
        "reason": "runtime error",
        "script_stack": [
          "java.util.ArrayList.rangeCheck(ArrayList.java:657)",
          "java.util.ArrayList.get(ArrayList.java:433)",
          "ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}",
          "                     ^---- HERE"
        ],
        "script": "for(int j=0;j<ctx.payload.hits.total;j++){ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}",
        "lang": "painless",
        "caused_by": {
          "type": "index_out_of_bounds_exception",
          "reason": "Index: 10, Size: 10"
        }
      }
    },
    "reason": "Failed to transform payload"

Thank you in advance. You 've been a big help!!

Sorry but ctx.payload.hits.totalmean the number of documents found by your elasticsearch query and is different of the size of the array ctx.payload.hits.hits

Your script should be :
,
for(int j=0;j<ctx.payload.hits.hits.size();j++){ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}"

And like i said before ,in you query you have to set the parameter"size":40, this parameter allows you to configure the maximum amount of hits to be returned.
More information there Size

**Thank you for kind answer. **
I've changed my code like you said. It is working well now.
I can see the result as follows.

            "message": "45.32.120.142 - - [13/Sep/2018:13:56:21 +0900] \"GET /js/czjl.js HTTP/1.1\" 200 119 17394 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36 QQBrowser/4.3.4986.400\" \"-\"",
            "time_zone": "+0900",
            "tags": [
              "beats_input_codec_plain_applied"
            ],
            "minute": "56",
            "@timestamp": "2018-09-19T01:50:35.657Z",
            "month": "Sep",
            "time": "2018-Sep-13 13:56:21"
          },
          "_id": "ifuF72UBMz99rcsrpVUG",
          "_score": 3
        }
      ],
      "total": 1537,
      "max_score": 3
    },
    "took": 3,
    "timed_out": false
  },
  "search": {
    "request": {
      "search_type": "query_then_fetch",
      "indices": [
        "apache_sid"
      ],
      "types": [],
      "body": {
        "query": {
          "bool": {
            "must": [
              {
                "range": {
                  "new_time": {
                    "gte": "now-15d"
                  }
                }
              },
              {
                "range": {
                  "new_time": {
                    "lt": "now"
                  }
                }
              }                 
            ]
          }
        }
      }
    }
  }
},
"condition": {
  "type": "always",
  "status": "success",
  "met": true
},
"transform": {
  "type": "script",
  "status": "success",
  "payload": {
    "_value": null
  }
},
"actions": [
  {
    "id": "index_payload",
    "type": "index",
    "status": "success",
    "index": {
      "response": {
        "created": false,
        "result": "updated",
        "id": "bbb_id",
        "version": 47,
        "type": "bbb_type",
        "index": "bbb"
      }
    }
  }
]

},
"messages":
}

But when I search for bbb index. It is giving me empty result below.
{
"took": 10,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "bbb",
"_type": "bbb_type",
"_id": "bbb_id",
"_score": 1,
"_source": {
"_value": null
}
}
]
}
}
It seems that I can't save processing result in index even though I set up the action setting as follows.

"actions": {
"index_payload": {
"index": {
"index": "bbb",
"doc_type": "bbb_type",
"doc_id": "bbb_id"
}
}
}

I don't know what is happening right now. You 've been a big help! You must be an expert!

If you want to modify a document you need to rewrite it by specifying the same ID with index action :
Lets do it step by step :
First , you need to specify the index action like it :

      "actions": {
  "index_payload": {
    "transform": {
      "script": {
        "stored": "the_script"
      }
    },
    "index": {
      "index": "bbb",
      "doc_type": "bbb_type"
    }
  },}

You script have to return JSON objects, for example , indexing only one document :

POST _scripts/the_script
{"script": {
    "lang": "painless",
    "code": """ def docs = []
                docs.add(['_id': ctx.payload.hits.hits[0]._id, "duration":ctx.payload.hits.hits[0]._source.duration / 1000]);
                return ['_doc' : docs];
            """
  }
}

so the document with the id ctx.payload.hits.hits[0]._id will be replaced by the document returned by the script .

1 Like

Actually, What I really want to do is to create new index after adding new field to original index.
But whenever I set up the action setting as follows,
"actions": {
"index_payload": {
"index": {
"index": "bbb"
}
}
},

],

, I get this kind of error below.
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: type is missing;"

I am such a beginner. Thank you for your quick answer!!. You are an angel~!!

Hey,

unfortunately mass editing of already indexed data is not something that you should use watcher for. Let me explain the reason for this: Using watcher you would only be able to retrieve a certain number of documents within one request (by default 10000 unless configured otherwise), which you could then change and reindex. This would however be inefficient, and also mean, that you are limited once you need to change more than those number of documents.

The proper solution to this would be to use update by query, where you can run a script to update all the affected documents within an index.

The proper solution to this problem would of course be to solve this problem at index time. Have you considered using a script processor?

Hope this helps!

--Alex

1 Like

Thank you for your kind answer! But I need to create new index after modifying fields of old index periodically. Reindex api could be an answer for that or could you suggesst something to me?

yes, use reindex instead of update by query.

Thank you for your advice. I hope you have a nice day!

Actually, I mean creating new index. not modifying old index. Do you mean that? How could I change the default setting (by default 10000 documents)? I have to create new index from old index. not modifying it.
"actions": {
"index_payload": {
"index": {
"index": "bbb"
}

Just repeating what I stated above: the index action does not support reindex/update-by-query. Those are separate APIs. You could use the webhook action do trigger that.

Thank you for your advice.
How can I change the number of documents which are retrieved within one request when using watcher?

Using watcher you would only be able to retrieve a certain number of documents within one request (by default 10000 unless configured otherwise)

I need to retrieve all the documents. Thank you in advance!!

This is done using a scroll search and not possible with watcher currently.

I listed an alternative already above, please explain why that is not suitable in your use-case.

Thanks!

1 Like

Thank you for your advice.
We need to use watcher. If we use reindex api, we have to use crontab (we need to do this process periodically)
and it means that we have to take care of all the logs generated. That's why we avoid using reindex api.

Can we retrieve all the data if we use the webhook action like you mentioned above?

you could use the reindex/update by query API as part of a webhook.