Upsert ELasticSearch documents with Spark

Is there an API with Spark, Spark Streaming or Structured Streaming Spark that I can use to upsert documents in ElasticSearch with a script?

Elastic-Hadoop has the .saveToES function but I cannot understand which parameter use in order to use the script field.

The structured streaming APIs work with dataframes. I found this API that write dataframe in ES:

queryES  = df\
.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","index/doc_type") \
.option("checkpointLocation", "checkpoint").start()

but I cannot wrap my head around on where to specify the parameters for the upsert.

I would like to upsert document as (here a python code that insert documents if aren't in the index already or increment a counter if they are already in it):

for message in consumer:

    msg = json.loads(message.value)
    print(msg)
    index = INDEX_NAME
    es_id = msg["id"]
    script = {"script":"ctx._source.view+=1","upsert" : msg}
    es.update(index=index, doc_type="test", id=es_id, body=script)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.