I have a 3 node Elasticsearch cluster configured to be fault tolerant in that the cluster will continue working fine if any 1 of the 3 nodes fail (or is just shutdown to simulate a failure). This works fine.
I have an spark streaming job that inserts data into this Elasticsearch cluster in one minute batches. When the 3 Elasticsearch nodes are running correctly, there is no issue with the Spark Streaming job and events are correctly inserted into Elasticsearch (ES).
But if I shutdown one of the ES nodes, the spark jobs fails when it tries to insert into ES with the following exception:
Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable)
The ES node that was shutdown - its ip address is no longer valid on the network and that is where the issue is coming from - but the ES cluster is still working ok with just the 2 remaining nodes. Is there a solution to this issue?
Lib I'm using the following maven lib:
org.elasticsearch
elasticsearch-spark_2.10
2.3.0
This is the api calls that throws the exception:
JavaEsSpark.saveToEs
I configure the 3 ES nodes when I submit the spark streaming job initially using "spark.es.nodes"
One workaround/hack I can do that actually fixes the issue - is on the spark nodes if I log in and edit /etc/hosts and change the invalid IP to a different valid ip (another server on the network doesnt even have to be running ES) everything works fine - the spark job doesnt crash and I can see the data in KIbana. Obviously we would prefer not to do this and for the spark streaming job to handle the issue automatically itself - but it does fix the issue
This sounds like a Spark issue: it needs to fail over to another ES node when the first one is unavailable, but it sounds like it is not doing so. I do not know Spark so don't know if this is something that can be fixed there.
Possibly it's something that you could resolve by changing how your network behaves when a node is shut down:
However, even if it were a valid address on the network you'd just get a different exception, saying "connection refused" or similar. Maybe Spark handles that kind of exception differently, I don't know.
If you cannot follow either of these two paths, perhaps you could add an HTTP reverse proxy between Spark and Elasticsearch which implements the failover behaviour instead. I have experience with HAProxy and believe it would work here, although I haven't tried it in this specific setup.
Yes it does, Spark can actually handle this situation ok and I can even see the data afterwards using kibana. (It doesn't seem to matter that the server of this new/valid ip is not even running ES)
Also just to add - its not limited to just the first node in the es.nodes list - if any node/ip in the list is invalid the NoRouteToHostException occurs.
HaProxy - cool thanks we actually have it installed already, so I will try this approach.
It definitely appears the "spark to elasticsearch" lib in spark streaming - does not support this sort of fail over scenario. I even tried programmatically in the spark job to get access to the "es.nodes" list at run time, so I could remove any failed nodes before the insert. But it appears this option, isn't possible either.
I can confirm that using HaProxy does solve the issue - and I can turn ES nodes on and off and spark streaming continues to work fine and all data is viewable in kibana.
Can I ask one last question:
I just now pass the one node in es.nodes (which points to HaProxy) to spark and I let HaProxy do RR load balancing on the ES nodes. From the testing I've done so far all is working correctly, but just wondering if I scaled my cluster might HaProxy become a bottle between spark and ES? How does spark streaming cluster select nodes from es.nodes at runtime? (Does it just pick one from the list or does it open multiple connections to multiple nodes on each call to JavaEsSpark.saveToEs?)
Just curious if I'm ok doing this or should I give each Elasticsearch node an entry in HaProxy and pass the entire list to spark. Both options work fine but just wondering might the second be safer for scalability...
At the moment we just have a 2 node spark cluster and 3 node ES cluster anyway so it probably wont be an issue here anyway
I don't know, sorry, and I'm not very familiar with the project so I couldn't easily find out by reading the source. I've asked some more knowledgeable people to take a look.
HAProxy is pretty darn fast, so I expect the bottlenecks to be elsewhere. If it did become a problem I think you could always just add more HAProxies: they're effectively stateless.
@patadaptive ES-Hadoop used to fail hard on NoRouteToHostException because it was originally set as a way to notify a user earlier than later if an erroneous IP address was used in a cloud environment. That said, it was not a good idea at the time to make that assumption, since the failover logic would simply move on to the next address and attempt to operate on it until it exhausted its addresses anyway. In 5.6.0 we fixed this so that it would correctly fail over to the next host in line. https://github.com/elastic/elasticsearch-hadoop/issues/993
I see that you are using version 2.3.0 - You may need to upgrade to use ES-Hadoop 5.6.0. That version should work against the 2.x line as it has backwards compatibility functions built into it, but those tests assume the latest version on the 2.x line. That said, it works best if you can guarantee that ES-Hadoop is the same version as ES.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.