Delete from index


How can I issue delete operations from RDD?
Can I create a Connector instance and use it ?



Delete operation is not currently supported. The main reason behind it is that typically an index is being deleted as a whole not doc by doc. Further more, in cases where this happens, mapping this on top of Hadoop is not that 'natural' - basically one needs to provide the IDs one by one (potentially in a CSV or something similar).
Thus the entries sent to the connector are quite small and don't really match the domain.
However this doesn't mean it won't be done - there's an issue raised for it however currently it's somewhat low priority.

So what about update operations?
How do you support updating existing documents?


The documentation explains that. The difference is in case of an update, it's not just the id of the document that is passed but also the body.
Again, the process from the connector side is the same however the data that goes it significantly different.
An update is quite similar to an index as oppose to the delete.
Note that I'm not so much talking about the internal structure but rather usage.

Hi Costin,

My logic needs to perform delete operations. This is why I've asked whether I can have access to your underlying client/connection to ES. I understand that there is none. This is a bit problematic since now while using es-spark to index new data, in parallel, I have to instantiate another ES client and have it shared in the function that I use on each RDD...
If I understand correctly, the two main things that the es-spark provides are:

  1. Locality logic
  2. Convenience - just call saveJsonToES ... and it works.
    It seems that for more complicated cases, it is not enough. However, I have a suggestion, that will provide more flexibility on the ES operations that are supported while having the locality logic and the convenience of operations on RDDs:
  3. Enable some access to the underlying Connection object to ES that can be used from, for example, mapPartitions or foreachRDD.
  4. Add a method, lets call it "doOnES" that will be enabled on a RDD of org.elasticsearch.action.DocumentRequest thus enabling the user to "enjoy" the locality logic while choosing the operation.


The concept of Connection or unit of work doesn't play well when using the connector since it is not a framework but a library. To use a connection means that somebody that the user code has access to the runtime which is not the case; both Spark and Hadoop execute user code remotely while the connector plugs in as a storage and not a user API.
In other words, one describes an input/output which the connector than uses to stream data.

A Connection would offer custom write access to Elastic which, while possible, is outside the purpose of the connector itself. In fact, a user can simply create the connection directly and use the myriad of Elasticsearch clients to execute all kind of operations that go beyond the connector itself.

Indeed. It's a dedicated client for streaming data efficiently between two distributed systems. The more generic it gets, the lower level it will be and require additional custom code. This is not the intent, at least for now.

Please provide an actual example (code would help). You are thinking in terms of Template/Callback as in Spring's HibernateTemplate. This might make sense within the connector however it would need to be bound to an actual storage. And the RDD is not one in a true sense but rather a lazy/functional evaluation of data (which is not storage).

By the way, you are referring to locality logic but that implies the data location is known before hand which in turn means the operations are not as generic as a typical Connection. Also you are referring to the Java API (DocumentRequest) which is not what the Elasticsearch Spark relies on or is modelled after.

Hi Costin,

is it possible / how can I accomplish a delete of the whole index from Scala?


@RSiebeling You can't think RDDs don't offer delete methods (for good reason I would argue). You can however delete the index manually which is typically what the connector recommends anyway since it's a destructive operation and doesn't have a clear API hook in Hadoop.

P.S. Please open new threads instead of reviving old ones. Thanks!

I am creating a spark object which contains the stats of a table and writing into elasticsearch
val cases_count_stats = spark.sql("select col2,count(*) from table group by col2"

How do I delete the index everything before I create it ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.