I'm using pyspark to index 8 TB of data into an elasticsearch index. My code looks something like this:
df.write.format("org.elasticsearch.spark.sql").mode(
'append'
).option(
"es.mapping.id", "id"
).options(
**self.es_config
).option(
"es.resource", myidx
).save()
This works fine until I get through around 1/8 of the data, at which point there is one particular task which fails:
2017-09-05 11:59:21,627 WARN rg.apache.spark.scheduler.TaskSetManager - Lost task 113.0 in stage 14.0 (TID 53197, xyz.def.net, executor 33):
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[https://a.net:9200, https://b.net:9200, https://c.net:9200]]
No matter how I tweak spark and elasticsearch bulk indexing settings (fewer executors, various batch sizes, more writing retries, longer waits between writing retries), the job always fails at that task and that stage, which leads me to believe that there's a specific piece of data that elasticsearch is rejecting. However, I'm at a loss as to how to find out what part of the data this is, since I have 8 TB of data and am otherwise able to work with the data without a problem purely in spark.
Is there some way for me to capture the data being sent in the task that's causing trouble? Based on what I see here it looks like there's no way to catch the exception, store the bad data, and continue with the job; is there anything else I can do? I am unable to write my data to elastic at all at the moment because of this failure; the entire job gets killed.