Org.elsticsearch -- trying to run scripted upserts, written in painless language, from Spark -- {type=null_pointer_exception, reason=null}

Hi There,

I am trying to run the following code

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
 // .option("es.nodes.discovery", "false")
 // .option("es.nodes.client.only", "false")
  .option("es.mapping.id", "fileName")
  .option("es.resource","<myIndex>/_doc")
  .option("es.update.script.stored","mp-upsert-with-dup-check-and-revisions")
  .option("es.script.lang","painless")
  .option("es.write.operation", "upsert")
  .option("es.nodes", esURL)
  .mode("append")
  .save("<myIndex>/_doc")

and getting following error

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: illegal_argument_exception: failed to execute script;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: script_exception: runtime error;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: 
{type=null_pointer_exception, reason=null}

My Painless Code is

   {
      "_id" : "mp-upsert-with-dup-check-and-revisions",
      "found" : true,
      "script" : {
        "lang" : "painless",
        "source" : """
                    // Get Old and New Documents
                    def old_source = new HashMap(ctx._source);
                    def new_source = new HashMap(params.newsource);
                    

                    // Duplicate Check
                    def pure_old_source = new HashMap(ctx._source);
                    pure_old_source.remove("ingestion_meta");
                    pure_old_source.remove("_revisions");
                    
                    def pure_new_source = new HashMap(params.newsource);
                    pure_new_source.remove("ingestion_meta");
                    
                    
                    // Noop Check and Revision Tracking
                    if(pure_old_source == pure_new_source){
                      ctx.op = 'none';
                    }else{
                      if(old_source.ingestion_meta != null){
                        if(old_source._revisions == null){
                          new_source._revisions = [];
                        }else{
                          new_source._revisions = new ArrayList(old_source._revisions);
                          old_source.remove("_revisions")
                        }
                        new_source._revisions.add(old_source);
                      }
                      
                    }

                    // Add Ingestion_Meta if missing
                    if(new_source.ingestion_meta == null){
                        new_source.ingestion_meta = new HashMap();
                    }
                    
                    
                    // Add created_at timestamp
                    if(old_source.ingestion_meta == null){
                      new_source.ingestion_meta.created_at = ctx._now;
                    }else{
                      new_source.ingestion_meta.created_at = old_source.ingestion_meta.created_at;
                    }
                    
                    // Add modified_at timestamp
                    new_source.ingestion_meta.modified_at = ctx._now;
                    
                    ctx._source = new_source;

                  """
      }
    }

I think this feature has still not been added to es.hadoop package.

Written on this URL

moving on, it is also the last comment on this URL

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.