Watcher with Index Action - Not writing data properly

Hello, I am trying to create a watcher that scans a log index and once it finds a search term, writes the JSON payload to a new index with an Index action.

I am seeing that the data that is reaching the Index this way is different in format than when we hit the elastic endpoint directly through Postman with a PUT request .

The data is properly parsed and inserted when done from postman as shown in snapshot below.

The JSON body is under the _source at parent level and hence the parsing is happening .

When same thing happens through Index action of a watcher , this is how it shows up in Kibana.

As you can see , the actual json payload is not at parent _source but inside hits->hits->_source.

How can I update my watcher script to write the json payload at the parent _source and not inside the hits->hits->_source

Any help is greatly appreciated. Thank you so much.

Here is my watcher script :

{
  "trigger": {
    "schedule": {
        "hourly" : { "minute" : [ 0, 5, 10, 15 ,20, 25, 30, 35 ,40, 45, 50, 55 ] }  
      }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "iks-dev*"
        ],
        "types": [],
        "body": {
          "size": 1000,
          "query": {
            "bool": {
              "must": [
                {
                  "match_all": {}
                },
                {
                  "match_phrase": {
                    "app_className": {
                      "query": "RequestAndResponseLogger"
                    }
                  }
                },
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-5m/m"
                    }
                  }
                }
              ],
              "filter": [],
              "should": [],
              "must_not": []
            }
          },
          "_source": [
            "app_message"
          ],
          "sort": [
            {
              "@timestamp": {
                "order": "desc"
              }
            }
          ]
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
  "actions": {
    "send_email": {
      "email": {
        "profile": "standard",
        "to": [
          "example@gmail.com"
        ],
        "subject": "Encountered {{ctx.payload.hits.total}}  stats(Environment)!",
        "body": {
          "text": " Report \n\n {{#ctx.payload.hits.hits}}{{_source.app_recordLoc}}\n\n{{_source.app_message}}\n\n{{/ctx.payload.hits.hits}} "
        }
      }
    },
	"index_payload" : { 
	  "index" : {
      "index" : "tnr-doc-store-dev", 
      "doc_type" : "_doc" 
    }
  }
  },
  "throttle_period_in_millis": 900000
}

Hello @Bhavani_Prasad

It is normal as the index action will take the whole content of the response and index it.

If you want to index the hits you matched on the input search, you have to perform some preprocessing using a transform.

Prior to run it again with the code below, please delete the index tnr-doc-store-dev as you might end up in a mapping conflict.

"actions": {
      "index_payload": {
        "transform": {
          "script": {
            "source": "[ '_doc': ctx.payload.hits.hits.stream().map(h -> h._source).collect(Collectors.toList()) ];",
            "lang": "painless"
          }
        },
        "index": {
          "index": "tnr-doc-store-dev",
          "doc_type": "_doc"
        }
      }
    }

Thank you so much. I tried with the script below and it gave me below error .Also I only need the contents of _source.app_message hence i added that in the transform script

"index_payload": {
        "transform": {
          "script": {
            "source": "[ '_doc': ctx.payload.hits.hits.stream().map(h -> h._source.app_message).collect(Collectors.toList()) ];",
            "lang": "painless"
          }
        },
        "index": {
          "index": "tnr-doc-store-qa",
          "doc_type": "_doc"
        }
   }

My search is returning multiple rows and all rows need to be added to index.

The script I've provided takes each hit._source and indexes it in the destination index.
I've tested just before sharing it to you.

If you only need one field in the final document, you have to use:

"index_payload": {
        "transform": {
          "script": {
            "source": "[ '_doc': ctx.payload.hits.hits.stream().map(h -> ['app_message' : h._source.app_message ]).collect(Collectors.toList()) ];",
            "lang": "painless"
          }
        },
        "index": {
          "index": "tnr-doc-store-qa",
          "doc_type": "_doc"
        }
   }

Thank you again..getting close.

Its writing the key "app_message" as well in the index because of which the index is not mapping all fields of JSON. I just need the json body or value of the key "app_message" .

From your screenshot, the content of app_message is a **stringified representation of a JSON **, not a JSON object.

Painless has no JSON parsing library so it's not possible to convert the string into an actual object.

The only workaround I can think of is using an Ingest Pipeline.

The following solution requires Ingest nodes.

  1. Delete the index tnr-doc-store-qa
DELETE tnr-doc-store-dev
  1. Create a Ingest Pipeline to decode JSON strings
PUT _ingest/pipeline/jsondecode-app_message
{
  "processors": [
    {
      "json": {
        "field": "app_message",
        "add_to_root": true
      }
    },
    {
      "remove": {
        "field": "app_message"
      }
    }
  ]
}
  1. Create the destination index
PUT tnr-doc-store-dev
  1. Assign the index a default ingest pipeline
PUT tnr-doc-store-dev/_settings
{
  "index.default_pipeline": "jsondecode-app_message"
}
  1. Run the Watcher
...
    "actions": {
      "index_payload": {
        "transform": {
          "script": {
            "source": "[ '_doc': ctx.payload.hits.hits.stream().map(h -> [ 'app_message ': h._source.app_message ]).collect(Collectors.toList()) ];",
            "lang": "painless"
          }
        },
        "index": {
          "index": "tnr-doc-store-dev",
          "doc_type": "_doc"
        }
      }
    }
...
1 Like

Thank you, I tried the steps but getting error : cannot add non-map fields to root of document]; nested: IllegalArgumentException[cannot add non-map fields to root of document . Here is the execution output from index action .


 "actions": [
      {
        "id": "index_payload",
        "type": "index",
        "status": "failure",
        "transform": {
          "type": "script",
          "status": "success",
          "payload": {
            "_doc": [
              {
                "app_message ": "{\"request\":{\"header\":{\"clientId\":\"AACORN\",\"channel\":\"ARC\",\"transactionId\":\"aacd6333-7164-4ec6-a9c5-2a751f5da9a8\",\"host\":\"TSTS\",\"stationId\":null,\"nofepSessionToken\":null,\"swstoken\":null,\"pointOfSale\":\"XTM\",\"recordLocator\"
              }
            ]
          }
        },
        "index": {
          "response": [
            {
              "failed": true,
              "message": "ElasticsearchException[java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: cannot add non-map fields to root of document]; nested: IllegalArgumentException[java.lang.IllegalArgumentException: cannot add non-map fields to root of document]; nested: IllegalArgumentException[cannot add non-map fields to root of document];",
              "id": null,
              "type": "_doc",
              "index": "tnr-doc-store-qa"
            }
          ]
        }
      }

`````````````````````````````````````````````````````````````````````````

Also, why does it behave differently when writing same JSON payload from watcher vs through POSTMAN hitting elastic endpoint with PUT operation directly ? As you can see below , the json body is right under _source, when we run through postman . Thats exactly what I am looking for to achieve through this watcher as well.

You didn't follow the process as you changed the destination index as you're writing to tnr-doc-store-qa.

Unfortunately I cant delete the tnr-doc-store-dev and hence I have been updating only the tnr-doc-store-qa . Below are the exact steps I did :


DELETE tnr-doc-store-qa


PUT _ingest/pipeline/jsondecode-app_message
{
  "processors": [
    {
      "json": {
        "field": "app_message",
        "add_to_root": true
      }
    },
    {
      "remove": {
        "field": "app_message"
      }
    }
  ]
}


PUT tnr-doc-store-qa


PUT tnr-doc-store-qa/_settings
{
  "index.default_pipeline": "jsondecode-app_message"
}


"actions": {
      "index_payload": {
        "transform": {
          "script": {
            "source": "[ '_doc': ctx.payload.hits.hits.stream().map(h -> [ 'app_message ': h._source.app_message ]).collect(Collectors.toList()) ];",
            "lang": "painless"
          }
        },
        "index": {
          "index": "tnr-doc-store-qa",
          "doc_type": "_doc"
        }
      }
    }


``````````````````````````````````````````````````````````````````````````````

After I ran the watcher, I get below error message : 

"index": {
          "response": [
            {
              "failed": true,
              "message": "ElasticsearchException[java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: cannot add non-map fields to root of document]; nested: IllegalArgumentException[java.lang.IllegalArgumentException: cannot add non-map fields to root of document]; nested: IllegalArgumentException[cannot add non-map fields to root of document];",
              "id": null,
              "type": "_doc",
              "index": "tnr-doc-store-qa"
            },

My bad..it worked..The issue was I had an extra space after app_message in the index action. After I removed it , it worked like a charm and I see all the data getting properly indexed.

Thank you so so much [Luca_Belluccini] for your help . You are amazing !!!

1 Like

Glad it helped! :sunny:

One last help .. My JSON payload doesnt contain any time field so that I can search my data on. Is there a way we can write the current timestamp to the index payload that I can use to run my time based searches ?? Thank you so much.

In the painless transform, use:

...
[ 'app_message ': h._source.app_message, '@timestamp': ctx.execution_time ]
...

That worked. Thank you so much.

1 Like

Hi Luca, I need to add a scripted field which is a sum of two values in my payload.

I created a scripted field as below under my index pattern but it throws an error .

Double.parseDouble(doc['request.orderGroups.orderItems.totalItemBaseAmount'].value) + Double.parseDouble(doc['request.orderGroups.orderItems.totalItemTaxAmount'].value)

"type": "illegal_argument_exception",
     "reason": "No field found for [request.orderGroups.orderItems.totalItemBaseAmount] in mapping with types []"

Do I need to access the values inside my json payload in a different way to get the fields I need for the scripted field

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.