How to upsert an initial value into elasticsearch using spark?


(Terran Yiu) #1

With HTTP POST, the following script can insert a new field createtime or update lastupdatetime:

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
    "lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
    "createtime": "2015-09-16T18:00:00"
    "lastupdatetime": "2015-09-16T18:00:00",
}
}'

But in spark script, after setting "es.write.operation": "upsert", i don't know how to insert createtime at all. There is only es.update.script.* in the official document... So, can anyone give me an example?


(eliasah) #2

I'm not very sure about that. But what I'll try to do is defining as es.mapping.id with the key of the document you want to upsert in the SparkConf.

val conf = new SparkConf()
[...]
conf.set("es.write.operation", "upsert")
// you can set the the document field/property name containing the document id.
// I believe that you are able to know that you should change <id> 
// with the desired field name
conf.set("es.mapping.id",<id>) 

I haven't tried this but I think that it should work!

Let us know if it works for you!


(Terran Yiu) #3

Thank you for your help.

In my case, i want to save the information of Android devices from log into one elasticsearch type, and set it's first appearance time as createtime.

If the device appear again, only update the lastupdatetime, but leave the createtime as it was. So the document id is android ID, if the id exists, update lastupdatetime, else insert createtime and lastupdatetime. So the setting here is(in python):

    conf = {
        "es.resource.write": "stats-device/activation",
        "es.nodes": "NODE1:9200",
        "es.write.operation": "upsert",
        "es.mapping.id": "id"
        # ???
    }
 
    rdd.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=conf
    )

I don't know how to insert a new field if the id not exist...


(eliasah) #4

The id doesn't exist where? In Elasticsearch or it the data you are reading?


(Terran Yiu) #5

Well, id is just in the input data.


(eliasah) #6

Then theoretically speaking it should only be available in your data when you perform an upsert


(Terran Yiu) #7

So you means i can't upsert any new field not in my data when i use spark? OK, i got it.... Thank you.


(eliasah) #8

I didn't say that. Let's agree first of the definition of an upsert action which is the following :

If the document does not already exist, the contents of the upsert element will be inserted as a new document. If the document does exist, then the script will be executed instead.

Which in your case will be :

  • update on id since you defined es.mapping.id -> id
  • insert a document with the _id equals id

(Terran Yiu) #9

Sorry, i couldn't login this site last day.

You are right. The requirement of my case is just:

  1. Update lastupdatetime if the id already exist in elasticsearch;
  2. Insert lastupdatetime and createtime=lastupdatetime when id not exist in elasticsearch;

The source doc is just like this:

{
    'id': 'xxxxx',
    'lastupdatetime': '2015-09-20'
}

The problem is that a new field createtime will be added to the source doc if id not exist in es yet. I don't know how to solve this problem in spark.

Here is a solution which is not perfect:

  1. add createtime to all source doc (rdd.map(lambda d: d['createtime']=d['lastupdatetime'])
  2. save to es with create and ignore 409
  3. remove createtime field
  4. save to es again with update

After these steps, i get what i want. But if there is a better solution?


(eliasah) #10

What is the structure of your final rdd before writing it to es?


(Terran Yiu) #11

Just like my last post, i write the doc to es twice now.
At first time, using create, rdd structure is

{
    'id': 'xxxxx',
    'lastupdatetime': '2015-09-20',
    'createtime':'2015-09-20',
}

At second time, using update, rdd structure is

{
    'id': 'xxxxx',
    'lastupdatetime': '2015-09-20'
}

so, if the id already exist, create will be fail, only lastupdatetime will be updated. However, i believe these 2 operations can be combined into 1.


(Terran Yiu) #12

At last, i found elasticsearch-hadoop don't support create if not exist, else do nothing.


(Terran Yiu) #13

from this post, i decide to build the elasticsearch-hadoop jar myself.


(Costin Leau) #14

Sorry for the late reply.
It seems the update support is not complete. I've seen you already raised an issue (great!) here so Iet's use that to track progress.

Enhancing the doc only to do another update is far from the proper solution. And doing bulk requests which create exceptions which later on are ignored is even worse.
This should be properly fixed in one call not 3 plus exceptions.


(system) #15