ES-Hadoop peer-to-peer architecture


(Jonathan Spooner) #1

In Elasticsearch for Hadoop on page 137 it says

Here, we can see the correlation between the units of parallelism of two systems.
The preceding diagram shows that there are two discrete-distributed systems on two sides: Hadoop and Elasticsearch. The Hadoop cluster can have several nodes in it, but only three nodes with Hadoop tasks are highlighted for simplicity. On the other hand, Elasticsearch also has three nodes in the cluster with three shards in it. This has one replica each. Each node or task on one side of the diagram can directly talk to the other node or task. Thus, it provides the peer-to-peer architecture.

ES-Hadoop respects the analogy of splits and shards to enable dynamic parallelism. It means that when you import data from Hadoop to Elasticsearch, ES-Hadoop doesn't just blindly dump the data into any of the Elasticsearch node, but it takes into account the number of shards and divides the indexing request between the available shards. The same holds true when we read the data from Elasticsearch and write it to HDFS using ES-Hadoop.

I'm trying to get a better understanding on the code behind this because my Spark job is overloading ES and I'm seeing the error Could not write all entries [27/324416] (maybe ES was overloaded?).

My ES cluster consist of 15x c3.8xlarge and I have the indexes and settings setup for for bulk loading. On the other side I have a 13 node Spark cluster. My job has two RDDs, one with 200 partitions and the other 600. Passing these directly to EsSpark.saveWithMetadata overloads ES very quickly.

I attempted using rdd.repartition on these RDDs to scale them down to match the number of ES nodes. That was a big improvement but after an hour of loading it overloads ES.

My ES cluster topology consists of 2x master and client nodes with 13 data nodes.

I've been digging around AbstractEsRDD, RestService.java and PartitionWriter to see if I could find the code that repartitions my RDD to match the number of ES Shards. I believe this is how it works.

Do you have any advice for debugging or getting better log output so I can better manage the number of Spark task?


(James Baiera) #2

I'm not familiar with this book. The versions they are using are very old (1.7.x), so I would take what you see in it with a grain of salt as things may have changed dramatically between then and the more current and better supported versions of the Elastic stack.

This is true to a certain degree, but I think it's worth extra clarification:

There are really two numbers at play here: The number of Hadoop/Spark tasks that are writing and the number of shards on ES that can accept the writes. ES-Hadoop identifies the locations of all the primary shards on the Elasticsearch cluster that you will be writing to and deals out those node locations to each of the running tasks in a round robin fashion. This distributes the write load evenly over the number of primary shards that you have in your cluster. In the case of a node failure, ES-Hadoop will dynamically rebalance the writes to a different Elasticsearch node.

What ES-Hadoop does not do is make any changes to the number of Hadoop/Spark tasks when writing to Elasticsearch. Everyone's ingest situation is different. Some clusters are quiet and can ingest data at three writing tasks per primary shard, and some clusters are too busy for writes and can only stomach a fraction of writing tasks. It's too situational of a property for us to assume total control over.

When writing a large amount of data to Elasticsearch from Hadoop or Spark, we advise that it's best to start at a number of writing tasks that match the number of shards that can accept the writes and begin tuning the number of writers from there, by increasing the partitions if the cluster is not busy, and decreasing the partitions if the cluster is still experiencing problems. Enabling TRACE level logging on the org.elasticsearch.hadoop.rest.commonshttp package will allow you to inspect the request and response traffic between your job and Elasticsearch. In ES-Hadoop 5.0.0 we have added more information to the logs when a bulk request fails to assist in troubleshooting the problem. Other options can be tuned from the Elasticsearch side to increase indexing performance. As with all performance based tuning, make sure to set a goal and measure every change.


(Thomas Decaux) #3

Did you tune elasticsearch to be bulk indexing friendly? (refresh_interval, analyzer, merge policy, shard routing...)

Then, monitor ES CPU, thread pools, and network bandwidth, start your tests with a single Spark worker then increase to 2, 3 etc..

You should discover a limit "index speed VS ES health", don't cross this limit :wink:


(Jonathan Spooner) #4

No I had to pause on this part of the job but I should be resuming tomorrow. I'll drop a comment with the results from my next load. Thanks!


(Jonathan Spooner) #5

I'm seeing some things in the logs but nothing that helps me. I'd like to know that my spark task are indeed connecting directly to a primary shard. Are there any logs for this?

Notes:

  • Using EMR 5.0.1 with es-hadoop org.elasticsearch:elasticsearch-spark-20_2.11:5.0.0
  • Logging options can be found here [logging.html]
    (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/logging.html)
  • log4j.properties log4j.category.org.elasticsearch.spark=TRACE and org.elasticsearch.hadoop.rest.commonshttp=TRACE not sure which one works.
  • Spark Usage: spark-shell --files log4j.properties

(Thomas Decaux) #6

One thing I use always for ES4Hadoop is a simple web proxy, to see whats going on behind the scene.

That helps a lot to debug.


(system) #7