Is Elastic Search Supports Partition by Column and Overwrite Specific Partition data within the Same Index?

Hi Team,

I have a csv file where i need to store this data in index ("carsdata") by partition column.

Input CSV (cars-data.csv):

carname,enginetype,cost,countryId,modelnumber
BMW1,Petrol,10000,1,12
BMW2,Petrol,20000,1,12
BMW3,Petrol,30000,1,12
BMW4,Petrol,40000,1,12
BMW5,Petrol,50000,18,13
BMW6,Petrol,60000,18,13
BMW7,Petrol,70000,18,13
BMW8,Petrol,80000,18,13
BMW9,Petrol,90002,18,13

Code:

object ElasticSearchWriteLocal {
def main(args: Array[String]) {
val sparkSession = sparkSession.builder().appName("WriteToElasticSearch").master("local").getOrCreate()
val dataFrame = sparkSession.read.option("header", "true").csv("cars-data.csv")

dataFrame.write
  .format("org.elasticsearch.spark.sql")
  .option("es.port", "9200")
  .option("es.nodes", "localhost")
  .partitionBy("countryId", "modelnumber")
  .mode("overwrite")
  .save("carsdata/doc")

}
}

This code is executing without any problem and i am able to see the data in the index called "carsdata".

Now my requirement is to overwrite specific data by partitionBy columns as shown below.

Existing data in the index (carsdata) for countryId=18 & modelnumber=13:

carname,enginetype,cost,countryId,modelnumber
BMW5,Petrol,50000,18,13
BMW6,Petrol,60000,18,13
BMW7,Petrol,70000,18,13
BMW8,Petrol,80000,18,13
BMW9,Petrol,90002,18,13

Assume New Data given for countryId=18 & modelnumber=13 as given below:

carname,enginetype,cost,countryId,modelnumber
BMW5,Petrol,60023,18,13
BMW6,Diesel,68444,18,13
BMW7,Petrol,84755,18,13
BMW8,Diesel,80000,18,13
BMW9,Diesel,483448,18,13

Now i wanted to overwrite only countryId=18 & modelnumber=13 data with new data as shown above and without Overwriting entire index data.

Could you please help me how can we achieve this without Overwriting entire index data.

And does Elastic-Search is really Supporting for Partition by column while writing? If Yes, Could you please help me on this.

If No, then may i know how code is executing without any problem as i have mentioned .partitionBy("countryId", "modelnumber")?

Please help me on this.

@dillibabu.mekala
I am not familiar with spark elasticsearch integration. So you need to translate it to spark.

To update a document you need to first locate the document. You have two choices either locate by _id or by running a query. Your code snippet will create documents with auto-generated ids. So you won't be able to locate _id without look up. You are not applying same change to all documents which means you won't be able to update by query.

I can think of two possible solutions

  1. Create document id yourself like <car_name>__modelnumber. This will allow you to locate document without look up. With this option your data may get unevenly distributed.
  2. If updates are not frequent, lookup by running query and get document id.

Okay, as you said assume i have created a carid (where carid <car_name>__modelnumber) locate single document uniquely.
I have read the Elastic Search API where we have 2-options to update data.

  1. Update API (to update single record by id (carid))
  2. Update by query, but it wont feasible if we are updating with different data for each document.

I observed that Update API will supports only single document at a time to update. But i would want to update the 1-million records through spark then how can we update is my question, but Update API will supports only single document at a time.

I have explored one more way as well, where i can delete some of the data (using delete by query) as per our requirement and append with new data. But delete by query is not performing as expected and it is not deleting all the data if we have millions of records.

Here is the sample code that i have executed in Kibana.

POST /carsdata/_delete_by_query
{
"query": {
"bool": {
"must": [
{
"match": {
"carname": "BMW4"
}
},
{
"match": {
"enginetype": "Petrol"
}
}
]
}
}
}

Could you please help me out how to delete some data (might be millions) based on the condition without any failures. and how to integrate this with Spark Elastic Search API if delete is working fine without any problem.

You can set the computed value as an attribute on the dataframe row then use es.mapping.id https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#cfg-mapping

Could you please provide me some snippet to update multiple records (as u mentioned like es.mapping.id) at a time through the spark. That would be helpful for me to proceed further.

I am not a spark developer. Someone else with Spark experience will help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.