I am having trouble with writing to a 5-node Elasticsearch cluster with Spark. Some basic details about the cluster:
5 nodes
6TB of disk
5x28 GB of RAM (half is Heap)
5x6 CPU
Allocated 140 shards for the relevant index
I have 3.8TB (~80.000.000 documents including nested fields) of data in parquet file that I would like to insert into the index. However some bulk requests keep on failing with the
I have noticed the following (I think) relevant details:
Some of the records that I insert are very big (we have seen an error that the document was rejected because it exceeded the limit of 256MB). We do not exactly know what was it's actual size.
Most of the records must be much smaller, but as mentioned some of them are quite big.
Bulk inserts including such big records seem to fail frequently with the above error.
I am currently using default es.batch.size.bytes and es.batch.size.entries. It must be with the default configurations that the batch is only a single record in the case of the large documents.
The nodes are quite active, but do not seem to be really overloaded or something.
Now, my question is what could I do with this problem? What I am considering:
Changing the mapping slightly which could reduce the size of the average record by 1/3. But still there will be big documents.
Increasing the es.http.timeout, es.batch.write.retry.count, es.batch.write.retry.wait, however, I am unsure whether this will give the desired effect.
It sounds like you're running into a quirk of the way es-hadoop behaves. It keeps a list of all of the elasticsearch nodes you tell it about. If it receives a failure from one of those nodes, it effectively strikes it from the list of nodes it will talk to for the rest of the time the client is active. That is true even if the exception is not the fault of the elasticsearch node (for example if you ask it to index a document that is bigger than allowed). So in your case since you have 5 nodes, if you have 5 documents in one task that are bigger than 256 MB then that task will remove all elasticsearch nodes from that task. That task will never complete even if you have retries enabled (since the same 5 documents will fail every time), and your whole job will fail. There are a few ways to work around this:
Filter your data in spark. That is, make sure you are not sending documents that are larger than 256 MB to elasticsearch since it's not going to work anyway. This is the best option if you can do it.
Generate more tasks. You fail once you have 5 bad documents in a single task. If you have more tasks, you have fewer documents per task, and you're less likely to hit 5 bad ones.
Artificially add nodes to your es.nodes list. I assume you're listing all 5 elasticsearch nodes in es.nodes. One hack we have used in the past is to add multiple DNS entries per node, and to put all of those into es.nodes. This way you could make it appear to es-hadoop that there are 10 or 15 nodes (or more), so that it would take more failures in a task to kill the task.
Well, actually we have provided a single node for the es.nodes parameter, because we assumed that the nodes were automatically discovered:
Note that the list does not have to contain every node inside the Elasticsearch cluster; these are discovered automatically by elasticsearch-hadoop by default (see below)
You think it is worth trying providing the full list of nodes?
Furthermore,
We have increased the payload size that is allowed to go beyond the 256MB threshold, and we have not seen the elasticsearch 413|request entity too large: error anymore which was the case before. We did still see the org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed error. My guess is that the records are actually getting indexed, but that it may take some time, timing out the requests. Although I am not sure.
I have put some effort in making the documents smaller. Which seems to help.
I have increased default timeout to 5m. The load started reasonably fast in the beginning, and showed no Connection error anymore. However, after some time the indexing rate started dropping drastically and I saw red statuses appearing.
Interestingly, I noticed the following on one of the nodes (1):
Notice here that the available heap on node 1 is half of that of node 0. We deploy Elasticsearch on Kubernetes, and have configured all nodes exactly the same. But somehow this node 1 always ends up having half of the available RAM of that of node 0. We have not yet a clue why this happens.
Well the result of this is clear. It has to do a lot of garbage collection. Any thoughts on these additional insights?
Oh there are no more request entity too large errors? That sounds like progress. Are there other errors in the spark task logs? There ought to be something explaining why it is giving up on a node. You will see something logged at error level like Node [<node_name>] failed (<error message>). Can you paste those here?
Well the thing is, I am now (after increasing http payload size, decreasing document sizes slightly, and increasing http timeout) not seeing the node failures anymore in the Spark logs. Although I do see the node 1 going red once every now and then. So not entirely sure why I am not seeing the Connection error anymore. But it could be that we suppress it now by having some increased timeout.
What I have seen AFTER these changes (so no Connection error anymore), once or twice is the following:
Just to make sure I understand where it now stands:
The job is now completing successfully
You're just occasionally seeing the Read timed out errors
Those Read timed out errors are killing individual task attempts but the task automatically retries and succeeds.
The Node [<node_name>] failed (<error message>) messages always include no other nodes left and never include selected next node
If all of that is true, then I would probably add all 5 nodes to es.nodes so that it takes 5 failures to cause a task attempt to fail (I can't remember if the other nodes are automatically discovered and added to that list or not but if you're seeing a single failure killing a task then that's a pretty good indication that they're not). And if the error is always Read timed out you might want to consider reducing the number of spark tasks that are hitting it at once (although there could be something else going on there).
The only thing I see on the driver logs is: Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[elasticsearch-4.elasticsearch.main.xxxxxxx:443]]. Then on the executor logs, I see the following: rest.NetworkClient (NetworkClient.java:execute(155)): Node [elasticsearch-4.elasticsearch.main.getfocus.eu:443] failed (java.net.SocketTimeoutException: Read timed out); no other nodes left - aborting.... I do not see any other error logs.
I tested with an extreme case of only two Spark tasks writing to Elasticsearch and that still gave these issues. But I have to admit that I have tried so many things that I am not exactly aware anymore whether other issues were affecting that trial.
Now, I am seeing some improvements:
We have made sure that all nodes have same amount of RAM (see above that one of our nodes previously had a smaller amount of RAM). We saw large garbage collection rates on that node.
We (almost) doubled the RAM on all nodes, such that each node has 52GB RAM.
We doubled the vCPU on all nodes, such that each node has 12 vCPU.
Increased the http.timeout parameter to 5m to deal with potential long indexing times of extremely fat documents.
We tried a couple of different configurations in terms of how many Spark tasks write to ES in parallel. Currently we have settled on 16 parallel tasks.
Now the job is running for 16 hours straight without giving any error anymore. Also indexing rates have improved significantly. At this rate it is expected to run for 42 hours in total, so fingers crossed.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.