HI Guys,
First of all will explain my use case and the problem that am facing right now,
We have a use case where we have modeled the data as time-series indices i.e one index for each month and again each monthly index would contain data at (date + product_id) grain and these indices contain few product attributes which are gonna change w.r.t time and we can't model this use case as Parent Child relationship where Parent contain the product attributes and Child contain the metrics fields because of some limitation like Filters(both parent n child context at a time) like ( MRP > 999 OR ROS > 5) are not possible, because MRP is a dimensional attribute and so part of parent and ROS(rate of sale) is a field calculated in child context.
So, because of this limitation, we have de-normalised the structure and when ever there is a change in product attribute, we revisit all the entries in all the indices and update the same.
Right now am using ES-Hadoop bulk update API for the same and the problem that am facing with Bulk Update API is, if a date+product_id combination doesn't exist then that update will fail with document bailing out error and eventually Spark job will also fail.
To avoid these errors, every day I join product updates with the fact table to figure out whether a record exist for that particular day or not in ES and only route the corresponding updates to ES and this join with fact table for 24 months duration (24 monthly indices) is quite expensive and it doesn't scale
So, is there any way where ES just ignores those updates for which the corresponding doc doesn't exist without error out the entire Spark job and if this is possible, then I can avoid the expensive fact table join and will send all possible combination of updates to ES and where ever matching docs doesn't exist those updates should simply be ignored.
My Config:
val elasticConf = collection.mutable.MapString, String
elasticConf += ("es.nodes" -> AppConfig.config.fifa_es_nodes)
elasticConf += ("es.write.operation" -> "update")
elasticConf += ("es.clustername" -> AppConfig.config.cluster_name)
elasticConf += ("es.port" -> "9200")
elasticConf += ("es.resource" -> indexName)
elasticConf += ("es.nodes.discovery" -> "false")
elasticConf += ("es.nodes.wan.only" -> "true")
elasticConf += ("es.batch.write.retry.count" -> "2")
elasticConf += ("es.batch.size.entries" -> "1000")
elasticConf += ("es.batch.size.bytes" -> "20mb")
if(! StringUtils.isEmpty(primary_key))
elasticConf += ("es.mapping.id" -> primary_key)
elasticConf