How to handle data that causes failure while indexing from spark to ES

I'm using pyspark to index 8 TB of data into an elasticsearch index. My code looks something like this:

df.write.format("org.elasticsearch.spark.sql").mode(
    'append'
).option(
    "es.mapping.id", "id"
).options(
    **self.es_config
).option(
    "es.resource", myidx
).save()

This works fine until I get through around 1/8 of the data, at which point there is one particular task which fails:

2017-09-05 11:59:21,627 WARN  rg.apache.spark.scheduler.TaskSetManager  - Lost task 113.0 in stage 14.0 (TID 53197, xyz.def.net, executor 33): 
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[https://a.net:9200, https://b.net:9200, https://c.net:9200]]

No matter how I tweak spark and elasticsearch bulk indexing settings (fewer executors, various batch sizes, more writing retries, longer waits between writing retries), the job always fails at that task and that stage, which leads me to believe that there's a specific piece of data that elasticsearch is rejecting. However, I'm at a loss as to how to find out what part of the data this is, since I have 8 TB of data and am otherwise able to work with the data without a problem purely in spark.

Is there some way for me to capture the data being sent in the task that's causing trouble? Based on what I see here it looks like there's no way to catch the exception, store the bad data, and continue with the job; is there anything else I can do? I am unable to write my data to elastic at all at the moment because of this failure; the entire job gets killed.

1 Like

This is a common problem in the community and we're taking steps to provide a way for users to intercept these data problems through some low level failure handling api. The design for this is still in progress and may be a little while before it becomes widely available. In the mean time, you could look into expanding the logging levels and see if that turns up anything during the request processing. In later versions of the connector (5.x series) when a bulk operation fails it prints out a sample of the documents that failed to be ingested.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.