Is there an API with Spark, Spark Streaming or Structured Streaming Spark that I can use to upsert documents in ElasticSearch with a script?
Elastic-Hadoop has the .saveToES
function but I cannot understand which parameter use in order to use the script field.
The structured streaming APIs work with dataframes. I found this API that write dataframe in ES:
queryES = df\
.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","index/doc_type") \
.option("checkpointLocation", "checkpoint").start()
but I cannot wrap my head around on where to specify the parameters for the upsert.
I would like to upsert document as (here a python code that insert documents if aren't in the index already or increment a counter if they are already in it):
for message in consumer:
msg = json.loads(message.value)
print(msg)
index = INDEX_NAME
es_id = msg["id"]
script = {"script":"ctx._source.view+=1","upsert" : msg}
es.update(index=index, doc_type="test", id=es_id, body=script)