Document Bailing Out Errors in Bulk Update Mode Resulting in Crashing the Entire Spark Job

(Rajivchodisetti) #1

HI Guys,

First of all will explain my use case and the problem that am facing right now,

We have a use case where we have modeled the data as time-series indices i.e one index for each month and again each monthly index would contain data at (date + product_id) grain and these indices contain few product attributes which are gonna change w.r.t time and we can't model this use case as Parent Child relationship where Parent contain the product attributes and Child contain the metrics fields because of some limitation like Filters(both parent n child context at a time) like ( MRP > 999 OR ROS > 5) are not possible, because MRP is a dimensional attribute and so part of parent and ROS(rate of sale) is a field calculated in child context.

So, because of this limitation, we have de-normalised the structure and when ever there is a change in product attribute, we revisit all the entries in all the indices and update the same.

Right now am using ES-Hadoop bulk update API for the same and the problem that am facing with Bulk Update API is, if a date+product_id combination doesn't exist then that update will fail with document bailing out error and eventually Spark job will also fail.

To avoid these errors, every day I join product updates with the fact table to figure out whether a record exist for that particular day or not in ES and only route the corresponding updates to ES and this join with fact table for 24 months duration (24 monthly indices) is quite expensive and it doesn't scale

So, is there any way where ES just ignores those updates for which the corresponding doc doesn't exist without error out the entire Spark job and if this is possible, then I can avoid the expensive fact table join and will send all possible combination of updates to ES and where ever matching docs doesn't exist those updates should simply be ignored.

My Config:
val elasticConf = collection.mutable.MapString, String
elasticConf += ("es.nodes" -> AppConfig.config.fifa_es_nodes)
elasticConf += ("es.write.operation" -> "update")
elasticConf += ("es.clustername" -> AppConfig.config.cluster_name)
elasticConf += ("es.port" -> "9200")
elasticConf += ("es.resource" -> indexName)
elasticConf += ("es.nodes.discovery" -> "false")
elasticConf += ("es.nodes.wan.only" -> "true")
elasticConf += ("es.batch.write.retry.count" -> "2")
elasticConf += ("es.batch.size.entries" -> "1000")
elasticConf += ("es.batch.size.bytes" -> "20mb")

if(! StringUtils.isEmpty(primary_key))
elasticConf += ("" -> primary_key)


(James Baiera) #2

As of 6.2 you might be able to use a custom bulk failure handler to ignore update failures and continue processing, but I feel compelled to mention that this sort of write pattern does not seem well structured for Elasticsearch. Perhaps it might make sense to give your data model some more thought in order to get it to work easier with Elasticsearch.

Barring all else, the bulk handler will most likely help you get past the issue, but you should be prepared to spend a decent amount of compute time ignoring errors in that case.

(Rajivchodisetti) #3

Thanks James, Will take a look at es-hadoop 6.2 bulk failure handler.

And I do agree that the data model is not optimal but after one month of brainstorming evaluating multiple approaches we decided on this approach but sure will continue thinking about it avoiding UN-necessary look-ups for updates. One approach that I could immediately think is maintaining a product master which contain one entry for each product and when did it first appeared and last appeared which should limit the no of updates

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.