Let's create 3 records and save them to ES
val df = sc.makeRDD(
Seq(
(Map(ID -> 1), Device("a", "Apple")),
(Map(ID -> 2), Device("b", "Banana")),
(Map(ID -> 3), Device("c", "Carrot"))
))
EsSpark.saveToEsWithMeta(df, "jstest/device", esConfig)
Now we can verify the records
val df2 = EsSpark.esRDD(sc, "jstest/device", esConfig)
df2.take(3).foreach(println)
(1,Map(id -> a, name -> Apple))
(2,Map(id -> b, name -> Banana))
(3,Map(id -> c, name -> Carrot))
Now let's say we have some new information for each of these docs so we do this
val df = sc.makeRDD(
Seq(
(Map(ID -> 1), Map("Skate" -> "board")),
(Map(ID -> 2), Map("Golf" -> "club"))
))
EsSpark.saveToEsWithMeta(df, "jstest/device", esConfig)
However we end up overwriting these documents vs appending the attributes to them.
val df2 = EsSpark.esRDD(sc, "jstest/device", esConfig)
df2.take(3).foreach(println)
(1,Map(Skate -> board))
(2,Map(Golf -> club))
(3,Map(id -> c, name -> Carrot))
The ES _bulk upload API has an update feature. For example we can do an initial bulk upload
POST devices-v1/device/_bulk
{"index":{"_id":"3FB5CE7C0B7A"}}
{"worstgolfer":"Dave"}
And the document looks like
GET /devices-v1/device/3FB5CE7C0B7A/_source
{
"worstgolfer": "Dave"
}
Now we can append that document with a new key/value.
POST devices-v1/device/_bulk
{ "update" : {"_id" : "3FB5CE7C0B7A"} }
{ "doc" : {"best-golfer":"Spooner"} }
{"update":{"_id":"A5584682386F"}}
{ "doc" : {"best-golfer":"Spooner"}, "doc_as_upsert" : true }
And you can see we didn't overwrite the original document
GET /devices-v1/device/3FB5CE7C0B7A/_source
{
"worstgolfer": "Dave",
"best-golfer": "Spooner"
}
So what can I do to get the es-hadoop library to use "update" vs "index" in the _bulk api?
My other ideas was to use _mget to fetch all the documents then map the new values to each of the results but I'm not sure if _mget is supported.
I'd also like to hear how people are handling updates in their applications.