Org.elsticsearch -- trying to run scripted upserts, written in painless language, from Spark -- {type=null_pointer_exception, reason=null}

Hi There,

I am trying to run the following code

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
 // .option("es.nodes.discovery", "false")
 // .option("es.nodes.client.only", "false")
  .option("es.mapping.id", "fileName")
  .option("es.resource","<myIndex>/_doc")
  .option("es.update.script.stored","mp-upsert-with-dup-check-and-revisions")
  .option("es.script.lang","painless")
  .option("es.write.operation", "upsert")
  .option("es.nodes", esURL)
  .mode("append")
  .save("<myIndex>/_doc")

and getting following error

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: illegal_argument_exception: failed to execute script;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: script_exception: runtime error;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: 
{type=null_pointer_exception, reason=null}

My Painless Code is

   {
      "_id" : "mp-upsert-with-dup-check-and-revisions",
      "found" : true,
      "script" : {
        "lang" : "painless",
        "source" : """
                    // Get Old and New Documents
                    def old_source = new HashMap(ctx._source);
                    def new_source = new HashMap(params.newsource);
                    

                    // Duplicate Check
                    def pure_old_source = new HashMap(ctx._source);
                    pure_old_source.remove("ingestion_meta");
                    pure_old_source.remove("_revisions");
                    
                    def pure_new_source = new HashMap(params.newsource);
                    pure_new_source.remove("ingestion_meta");
                    
                    
                    // Noop Check and Revision Tracking
                    if(pure_old_source == pure_new_source){
                      ctx.op = 'none';
                    }else{
                      if(old_source.ingestion_meta != null){
                        if(old_source._revisions == null){
                          new_source._revisions = [];
                        }else{
                          new_source._revisions = new ArrayList(old_source._revisions);
                          old_source.remove("_revisions")
                        }
                        new_source._revisions.add(old_source);
                      }
                      
                    }

                    // Add Ingestion_Meta if missing
                    if(new_source.ingestion_meta == null){
                        new_source.ingestion_meta = new HashMap();
                    }
                    
                    
                    // Add created_at timestamp
                    if(old_source.ingestion_meta == null){
                      new_source.ingestion_meta.created_at = ctx._now;
                    }else{
                      new_source.ingestion_meta.created_at = old_source.ingestion_meta.created_at;
                    }
                    
                    // Add modified_at timestamp
                    new_source.ingestion_meta.modified_at = ctx._now;
                    
                    ctx._source = new_source;

                  """
      }
    }

I think this feature has still not been added to es.hadoop package.

Written on this URL

moving on, it is also the last comment on this URL