Sending data to existing elasticsearch index through Watcher

Hi Guys,

I'm trying to send the output of watcher to existing index on elasticsearch, but it throws an error with following description -> "java.lang.IllegalArgumentException: Field [_source] is defined both as an object and a field"

Basically after performing some modification on the data of one index using transform, the result is being sent to another existing index!!!

"transform":{
"script":
"""
def doc_count = ctx.payload.hits.total;
def unique_hits = [];
def unique_sources = ctx.payload.aggregations.unique_sources.buckets.stream().map(hit->hit.key).collect(Collectors.toList());
def hit;
for(def source : unique_sources){
  for(def i=0; i< doc_count; i++){
    hit = ctx.payload.hits.hits[i];
    
    
    if (hit.containsKey('_index'))
    {
      hit.remove('_index');
    }
    
    if(source==hit._source.source){
      hit['_index'] = "index-for-action_index-test";
      unique_hits.add(hit);
      break;
    }
  }
}


return [ '_doc' : unique_hits ];
"""

},

And it's not necessarily related to existing index, I also tried to send output to whole new index, but it gave me same error.

Actually, at first, I faced another issue -> "could not execute action [index_payload] of watch [inlined]. [ctx.payload._index] or [ctx.payload._doc._index] were set together with action [index] field. Only set one of them" and after following this topic, I arrived at the current issue :frowning:

Hello @shubhamshd99

  1. Would it be possible to share the full Watch body?

  2. As we do not know the data, would it be possible to replace the index action with:

      "logging": { "logging" : { "text": "{{ctx}}" } }
    

    Then run the Watcher and send us the full JSON response of the Watch execution.

Thank you in advance.

Hi @Luca_Belluccini
Please find full body of the watcher and result of logging and action index as well!!!

Watcher :-

{
  "trigger": {
    "schedule": {
      "interval" : "5000m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
        "cft_portal_web_access-2020.03.19"
        ],
        "rest_total_hits_as_int": true,
        "body": {
          "size" : 1000,
          "aggregations" : {
            "unique_sources" : {
              "terms" : {
                "field" : "source",
                "size" : 100
              }
            }
          }
        }
      }
  }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
  "transform":{
    "script":
    """
    def doc_count = ctx.payload.hits.total;
    def unique_hits = [];
    def unique_sources = ctx.payload.aggregations.unique_sources.buckets.stream().map(hit->hit.key).collect(Collectors.toList());
    

    def hit;
    for(def source : unique_sources){
      for(def i=0; i< doc_count; i++){
        //hit = ctx.payload.hits.hits[i];
        
        if(source==ctx.payload.hits.hits[i]._source.source){
          //.put('_index', 'index-for-action_index-test');
          hit = ctx.payload.hits.hits[i];
          hit.remove('_index');
          unique_hits.add(hit);
          break;
        }
      }
    }
    
    
    //return unique_hits;
    return [ '_doc' : unique_hits ];
    """
  },
  "actions": {
    "index_payload" : {
      "index" : {
        "index" : "index-for-action_index-test"
      }
    },
    "logging": { "logging" : { "text": "{{ctx}}" } }
  }
}

Index action :

{
          "id" : "index_payload",
          "type" : "index",
          "status" : "failure",
          "index" : {
            "response" : [
              {
                "failed" : true,
                "message" : "java.lang.IllegalArgumentException: Field [_source] is defined both as an object and a field.",
                "id" : "mrEh8nABOjhT50Px45Xt",
                "type" : "_doc",
                "index" : "index-for-action_index-test"
              },
              {
                "failed" : true,
                "message" : "java.lang.IllegalArgumentException: Field [_source] is defined both as an object and a field.",
                "id" : "q7Eh8nABOjhT50Px45bt",
                "type" : "_doc",
                "index" : "index-for-action_index-test"
              }
           ] 
        }
}

Logging action :
{metadata={app=wps_bn_my, description=PI-001 : Monitor Web Access – Interal, env=non-prod}, watch_id=index-action, payload={_doc=[{_source={host=ldnpsr37510.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.gem.ldnpsr37510.10643.swebwpsp.web.access.ssl}, _score=1.0}, {_source={host=ldnpsr37511.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.gem.ldnpsr37511.10643.swebwpsp.web.access.ssl}, _score=1.0}, {_source={host=nykpsr10615.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.cf.nykpsr10615.10100.swebwpsp.web.access}, _score=1.0}, {_source={host=nykpsr10614.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.cf.nykpsr10614.10643.swebwpsp.web.access.ssl}, _score=1.0}, {_source={host=nykpsr10593.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.pi.nykpsr10593.10643.swebwpsp.web.access.ssl}, _score=1.0}, {_source={host=ldnpsr37587.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/nowext.gem.ldnpsr37587.10100.swebwpsp.web.access_log}, _score=1.0}, {_source={host=nykpsr10591.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.pi.nykpsr10591.10643.swebwpsp.web.access.ssl}, _score=1.0}, {_source={host=ldnpsr37585.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/nowext.gem.ldnpsr37585.10100.swebwpsp.web.access_log}, _score=1.0}, {_source={host=ldnpsr37510.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/nowws.barclays.com/logs/nowws.gem.ldnpsr37510.10101.swebwpsp.web.access}, _score=1.0}, {_source={host=ldnpsr37511.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/nowws.barclays.com/logs/nowws.gem.ldnpsr37511.10101.swebwpsp.web.access}, _score=1.0}, {_source={host=ldnpsr37511.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/now.gem.ldnpsr37511.10100.swebwpsp.web.access}, _score=1.0}, {_source={host=sgppsr00674.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/now.barclays.com/logs/nowext.sgp.sgppsr00674.10100.swebwpsp.web.access}, _score=1.0}, {_source={host=nykpsr10614.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/nowws.barclays.com/logs/nowws.cf.nykpsr10614.10101.swebwpsp.web.access}, _score=1.0}, {_source={host=nykpsr10615.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/nowws.barclays.com/logs/nowws.cf.nykpsr10615.10101.swebwpsp.web.access}, _score=1.0}, {_source={host=nykpsr10591.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/nowws.barclays.com/logs/nowws.pi.nykpsr10591.10101.swebwpsp.web.access}, _score=1.0}, {_source={host=nykpsr10593.intranet.barcapint.com, sourcetype=wps.portal.web.access, source=/apps/apache/nowws.barclays.com/logs/nowws.pi.nykpsr10593.10101.swebwpsp.web.access}, _score=1.0}]}, id=index-action_e25ae02a-3b9d-4738-8b49-5f04f606fe77-2020-04-30T10:07:25.342787Z, trigger={triggered_time=2020-04-30T10:07:25.342746Z, scheduled_time=2020-04-30T10:07:25.342746Z}, vars={}, execution_time=2020-04-30T10:07:25.342787Z}"

Thank you @shubhamshd99 for the details.

This should work:

{
    "trigger": {
      "schedule": {
        "interval": "5000m"
      }
    },
    "input": {
      "search": {
        "request": {
          "search_type": "query_then_fetch",
          "indices": [
            "cft_portal_web_access-2020.03.19"
          ],
          "rest_total_hits_as_int": true,
          "body": {
            "size": 1000,
            "aggregations": {
              "unique_sources": {
                "terms": {
                  "field": "source",
                  "size": 100
                }
              }
            }
          }
        }
      }
    },
    "condition": {
      "compare": {
        "ctx.payload.hits.total": {
          "gt": 0
        }
      }
    },
    "transform": {
      "script": """
def doc_count = ctx.payload.hits.total;
def unique_hits = [];
def unique_sources = ctx.payload.aggregations.unique_sources.buckets.stream().map(hit->hit.key).collect(Collectors.toList());
for(def source : unique_sources){
  for(def i = 0; i< doc_count; i++){
    if(source == ctx.payload.hits.hits[i]._source.source){
      unique_hits.add(ctx.payload.hits.hits[i]._source);
      break;
    }
  }
}
return [ '_doc' : unique_hits ];
    """
    },
    "actions": {
      "index_payload": {
        "index": {
          "index": "index-for-action_index-test"
        }
      },
      "logging": {
        "logging": {
          "text": "{{ctx}}"
        }
      }
    }
  }

Still, I would suggest to use a Transform Job if you want to "aggregate" the data, one per source. See https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html

1 Like

Thank you @Luca_Belluccini and also apologies for getting back little late!!!
but the following line would just add _source part right?? unique_hits.add(ctx.payload.hits.hits[i]._source);
But there are other fields like id, score, etc and I want these to be included as well, thats why I tried to remove only _index, instead of just adding _source.
If there is something I missed, please highlight!!!

Hello @shubhamshd99

The only error you were making was the following.

The result of your painless script was:

{
  "doc": [
    { "_source": { "field...": ... }, "_id": ... }
  ]
}

Instead, we need:

{
  "doc": [
    { "field...": ... , "_id": ...
  ]
}

If you want to preserve the _id, I suggest doing:

    "transform": {
      "script": """
def doc_count = ctx.payload.hits.total;
def unique_hits = [];
def unique_sources = ctx.payload.aggregations.unique_sources.buckets.stream().map(hit->hit.key).collect(Collectors.toList());
for(def source : unique_sources){
  for(def i = 0; i< doc_count; i++){
    if(source == ctx.payload.hits.hits[i]._source.source){
      ctx.payload.hits.hits[i]._source._id = ctx.payload.hits.hits[i]._id;
      unique_hits.add(ctx.payload.hits.hits[i]._source);
      break;
    }
  }
}
return [ '_doc' : unique_hits ];
    """
    },

There are other ways to do it, such as:

    "transform": {
      "script": """
def doc_count = ctx.payload.hits.total;
def unique_hits = [];
def unique_sources = ctx.payload.aggregations.unique_sources.buckets.stream().map(hit->hit.key).collect(Collectors.toList());
for(def source : unique_sources){
  for(def i = 0; i< doc_count; i++){
    if(source == ctx.payload.hits.hits[i]._source.source){
      def event = ctx.payload.hits.hits[i]._source;
      event._id = ctx.payload.hits.hits[i]._id;
      unique_hits.add(event);
      break;
    }
  }
}
return [ '_doc' : unique_hits ];
    """
    },

They are exactly the same.

Please consider that if you maintain _id, the documents will be overwritten if the _id are the same.

1 Like

Thank you so much for this beautiful explanation :innocent:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.