Using painless in watcher

alerting

(Nayun Oh) #1

I want to use painless in watcher to add or change data of some fields.
But when I added this code in this watcher,
"transform": {
"script": {
"source": "ctx._source.period = ctx._source.duration/1000",
"lang": "painless"
}
}
I got this error.

"root_cause": [
{
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"ctx._source.period = ctx._source.duration/1000",
" ^---- HERE"
],
"script": "ctx._source.period = ctx._source.duration/1000",
"lang": "painless"
}
],

Could you give me some advice for this?


(Alexander Reelsen) #2

the field ctx._source does not exist within the watch execution context. If you want to know what fields are available you could use a logging action and write out the whole ctx variable via mustache. You are most likely looking for ctx.payload which contains the input data.


(Guillaume Dufrenne) #3

You have to check your JSON response :slight_smile:
Which looks like this for an http input :

Capture
Most of the time , to get the source of the document hits you have to do this :
ctx.payload.hits.hits.0._source.period
This syntax will access to the document in position 0 ;
But you need to set "size": "<1 or superior>" in your query

Whatever you do in this script , it will not change the data of your documents stored in elasticsearch.
You have to index the result with an index action : Index action documentation


(Nayun Oh) #4

Thank you for your good advice.
Here is my code below.
Actually, I want to change value of field using watcher.

{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"aaa"
],
"types": [],
"body": {
"query": {

      }
    }
  }
}

},
"condition": {
"always": {}
},
"actions": {
"index_payload": {
"transform": {
"script": {
"source": "ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
"lang": "painless"
}
},
"index": {
"index": "bbb",
"doc_type": "bbb_type"
}
}
}
}

but It is giving me this kind of error below. Thank you in advance!!

"error": {
"root_cause": [
{
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
" ^---- HERE"
],
"script": "ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
"lang": "painless"
}
],
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
" ^---- HERE"
],
"script": "ctx.payload.hits.hits._source.duration = ctx.payload.hits.hits._source.duration / 1000",
"lang": "painless",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "Illegal list shortcut value [_source]."
}
}
},
"reason": "Failed to transform payload"
}


(Guillaume Dufrenne) #5

I think , it would be easier for you to use scripted field in index pattern=> Scripted field and then do this :

return doc['duration'].value/1000;


(Nayun Oh) #6

Thank you for good advice. But I have no choice but to use watcher. Because I need to do this periodically.

I've changed that code like this below.

{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"aaa"
],
"types": [],
"body": {
"query": {
"bool": {
"must": [
{
"range": {
"new_time": {
"gte": "now-1d"
}
}
},
{
"range": {
"new_time": {
"lt": "now"
}
}
}
]
}
}
}
}
}
},
"condition": {
"always": {}
},
"actions": {
"index_payload": {
"transform": {
"script": {
"source": "for(int j=0;j<ctx.payload.hits.total;j++){ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}",
"lang": "painless"
}
},
"index": {
"index": "bbb",
"doc_type": "bbb"
}
}
}
}

and It' s giving me this sort of error below.

 "type": "script_exception",
        "reason": "runtime error",
        "script_stack": [
          "java.util.ArrayList.rangeCheck(ArrayList.java:657)",
          "java.util.ArrayList.get(ArrayList.java:433)",
          "ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}",
          "                     ^---- HERE"
        ],
        "script": "for(int j=0;j<ctx.payload.hits.total;j++){ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}",
        "lang": "painless",
        "caused_by": {
          "type": "index_out_of_bounds_exception",
          "reason": "Index: 10, Size: 10"
        }
      }
    },
    "reason": "Failed to transform payload"

Thank you in advance. You 've been a big help!!


(Guillaume Dufrenne) #7

Sorry but ctx.payload.hits.totalmean the number of documents found by your elasticsearch query and is different of the size of the array ctx.payload.hits.hits

Your script should be :
,
for(int j=0;j<ctx.payload.hits.hits.size();j++){ctx.payload.hits.hits[j]._source.duration = ctx.payload.hits.hits[j]._source.duration / 1000}"

And like i said before ,in you query you have to set the parameter"size":40, this parameter allows you to configure the maximum amount of hits to be returned.
More information there Size


(Nayun Oh) #8

**Thank you for kind answer. **
I've changed my code like you said. It is working well now.
I can see the result as follows.

            "message": "45.32.120.142 - - [13/Sep/2018:13:56:21 +0900] \"GET /js/czjl.js HTTP/1.1\" 200 119 17394 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36 QQBrowser/4.3.4986.400\" \"-\"",
            "time_zone": "+0900",
            "tags": [
              "beats_input_codec_plain_applied"
            ],
            "minute": "56",
            "@timestamp": "2018-09-19T01:50:35.657Z",
            "month": "Sep",
            "time": "2018-Sep-13 13:56:21"
          },
          "_id": "ifuF72UBMz99rcsrpVUG",
          "_score": 3
        }
      ],
      "total": 1537,
      "max_score": 3
    },
    "took": 3,
    "timed_out": false
  },
  "search": {
    "request": {
      "search_type": "query_then_fetch",
      "indices": [
        "apache_sid"
      ],
      "types": [],
      "body": {
        "query": {
          "bool": {
            "must": [
              {
                "range": {
                  "new_time": {
                    "gte": "now-15d"
                  }
                }
              },
              {
                "range": {
                  "new_time": {
                    "lt": "now"
                  }
                }
              }                 
            ]
          }
        }
      }
    }
  }
},
"condition": {
  "type": "always",
  "status": "success",
  "met": true
},
"transform": {
  "type": "script",
  "status": "success",
  "payload": {
    "_value": null
  }
},
"actions": [
  {
    "id": "index_payload",
    "type": "index",
    "status": "success",
    "index": {
      "response": {
        "created": false,
        "result": "updated",
        "id": "bbb_id",
        "version": 47,
        "type": "bbb_type",
        "index": "bbb"
      }
    }
  }
]

},
"messages": []
}

But when I search for bbb index. It is giving me empty result below.
{
"took": 10,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "bbb",
"_type": "bbb_type",
"_id": "bbb_id",
"_score": 1,
"_source": {
"_value": null
}
}
]
}
}
It seems that I can't save processing result in index even though I set up the action setting as follows.

"actions": {
"index_payload": {
"index": {
"index": "bbb",
"doc_type": "bbb_type",
"doc_id": "bbb_id"
}
}
}

I don't know what is happening right now. You 've been a big help! You must be an expert!


(Guillaume Dufrenne) #9

If you want to modify a document you need to rewrite it by specifying the same ID with index action :
Lets do it step by step :
First , you need to specify the index action like it :

      "actions": {
  "index_payload": {
    "transform": {
      "script": {
        "stored": "the_script"
      }
    },
    "index": {
      "index": "bbb",
      "doc_type": "bbb_type"
    }
  },}

You script have to return JSON objects, for example , indexing only one document :

POST _scripts/the_script
{"script": {
    "lang": "painless",
    "code": """ def docs = []
                docs.add(['_id': ctx.payload.hits.hits[0]._id, "duration":ctx.payload.hits.hits[0]._source.duration / 1000]);
                return ['_doc' : docs];
            """
  }
}

so the document with the id ctx.payload.hits.hits[0]._id will be replaced by the document returned by the script .


(Nayun Oh) #10

Actually, What I really want to do is to create new index after adding new field to original index.
But whenever I set up the action setting as follows,
"actions": {
"index_payload": {
"index": {
"index": "bbb"
}
}
},

],

, I get this kind of error below.
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: type is missing;"

I am such a beginner. Thank you for your quick answer!!. You are an angel~!!


(Alexander Reelsen) #11

Hey,

unfortunately mass editing of already indexed data is not something that you should use watcher for. Let me explain the reason for this: Using watcher you would only be able to retrieve a certain number of documents within one request (by default 10000 unless configured otherwise), which you could then change and reindex. This would however be inefficient, and also mean, that you are limited once you need to change more than those number of documents.

The proper solution to this would be to use update by query, where you can run a script to update all the affected documents within an index.

The proper solution to this problem would of course be to solve this problem at index time. Have you considered using a script processor?

Hope this helps!

--Alex


(Nayun Oh) #12

Thank you for your kind answer! But I need to create new index after modifying fields of old index periodically. Reindex api could be an answer for that or could you suggesst something to me?


(Alexander Reelsen) #13

yes, use reindex instead of update by query.


(Nayun Oh) #14

Thank you for your advice. I hope you have a nice day!


(Nayun Oh) #15

Actually, I mean creating new index. not modifying old index. Do you mean that? How could I change the default setting (by default 10000 documents)? I have to create new index from old index. not modifying it.
"actions": {
"index_payload": {
"index": {
"index": "bbb"
}


(Alexander Reelsen) #16

Just repeating what I stated above: the index action does not support reindex/update-by-query. Those are separate APIs. You could use the webhook action do trigger that.


(Nayun Oh) #17

Thank you for your advice.
How can I change the number of documents which are retrieved within one request when using watcher?

Using watcher you would only be able to retrieve a certain number of documents within one request (by default 10000 unless configured otherwise)

I need to retrieve all the documents. Thank you in advance!!


(Alexander Reelsen) #18

This is done using a scroll search and not possible with watcher currently.

I listed an alternative already above, please explain why that is not suitable in your use-case.

Thanks!


(Nayun Oh) #19

Thank you for your advice.
We need to use watcher. If we use reindex api, we have to use crontab (we need to do this process periodically)
and it means that we have to take care of all the logs generated. That's why we avoid using reindex api.

Can we retrieve all the data if we use the webhook action like you mentioned above?


(Alexander Reelsen) #20

you could use the reindex/update by query API as part of a webhook.