Elasticsearch-hadoop and updating records


(Jonathan Spooner) #1

Let's create 3 records and save them to ES

val df = sc.makeRDD(
  Seq(
    (Map(ID -> 1), Device("a", "Apple")), 
    (Map(ID -> 2), Device("b", "Banana")), 
    (Map(ID -> 3), Device("c", "Carrot"))
  ))

EsSpark.saveToEsWithMeta(df, "jstest/device", esConfig)

Now we can verify the records

val df2 = EsSpark.esRDD(sc, "jstest/device", esConfig)
df2.take(3).foreach(println)
(1,Map(id -> a, name -> Apple))
(2,Map(id -> b, name -> Banana))
(3,Map(id -> c, name -> Carrot))

Now let's say we have some new information for each of these docs so we do this

val df = sc.makeRDD(
  Seq(
    (Map(ID -> 1), Map("Skate" -> "board")), 
    (Map(ID -> 2), Map("Golf" -> "club"))
  ))

EsSpark.saveToEsWithMeta(df, "jstest/device", esConfig)

However we end up overwriting these documents vs appending the attributes to them.

val df2 = EsSpark.esRDD(sc, "jstest/device", esConfig)
df2.take(3).foreach(println)
(1,Map(Skate -> board))
(2,Map(Golf -> club))
(3,Map(id -> c, name -> Carrot))

The ES _bulk upload API has an update feature. For example we can do an initial bulk upload

POST devices-v1/device/_bulk
{"index":{"_id":"3FB5CE7C0B7A"}}
{"worstgolfer":"Dave"}

And the document looks like

GET /devices-v1/device/3FB5CE7C0B7A/_source
{
   "worstgolfer": "Dave"
}

Now we can append that document with a new key/value.

POST devices-v1/device/_bulk
{ "update" : {"_id" : "3FB5CE7C0B7A"} }
{ "doc" : {"best-golfer":"Spooner"} }
{"update":{"_id":"A5584682386F"}}
{ "doc" : {"best-golfer":"Spooner"}, "doc_as_upsert" : true }

And you can see we didn't overwrite the original document

GET /devices-v1/device/3FB5CE7C0B7A/_source
{
   "worstgolfer": "Dave",
   "best-golfer": "Spooner"
}

So what can I do to get the es-hadoop library to use "update" vs "index" in the _bulk api?

My other ideas was to use _mget to fetch all the documents then map the new values to each of the results but I'm not sure if _mget is supported.

I'd also like to hear how people are handling updates in their applications.


(James Baiera) #2

Hello!

Elasticsearch for Apache Hadoop supports the following write operations: index (default), create, update, and upsert (which is just a modified update).

Please take a look at this documentation page for information about the different configurations that you can employ to change how the connector creates bulk requests to Elasticsearch.


(Jonathan Spooner) #3

ah, I see you set 'es.write.operation' on the config. This works as expected.

var esConfig:Map[String,String] = Map(
  "es.write.operation" -> "upsert"
)

(system) #4