Performance degradation when writing to AWS elasticsearch using elasticsearch-hadoop library

Hi,

I am using the es-hadoop library to write data to AWS elasticsearch through EMR. The input data is around 4 GB and the input split size is 64MB. I am using a map only job to write to ESOutputFormat. I can see that es-hadoop creates 28 reduce tasks but only the first reduce task is actually writing to the elasticsearch-index. My index has number of shards set to 5 which is the default. Since there is just 1 reduce task that is actually writing to AWS ES, it takes a long time to complete. How does es-hadoop come up with the number of map and reduce tasks based on the hadoop input splits and elasticsearch index shards? Also, is this due to the performance degradation when turning off node discovery in WAN networks like AWS elasticsearch.

In case of writing, ES-Hadoop doesn't create any tasks per se, it's the source (your mapper/file) that decides how many tasks there are.
If you only have 1 task there might be different reasons why that is:

  1. you put all the data under the same key (so the entire data is literally in one place).
  2. your source/file is not splitable which means there can be only one reader.

Hope this helps,

Hi @costin,

can you expand on the first point, putting all the data under the same key? What does that mean?
I have a similar issue with only one task writing (when calling rdd.saveToEs) though I have many tasks reading and mapping the data, and I know data is partitioned as expected.

Thanks,

Hi @larghir,

Data that is written out of a Map task with the same key will always go to the same Reducer task. If the entire data set is written out from the Map tasks with the same key, then the entire data set will be shuffled to a singular reducer.

Try to drill into each reduce task and check their counters for the data input group metrics (as opposed to regular input values). This outlines how many distinct keys that the values have been bucketed under. For reducers that are not writing, take a look at their counter metrics and see if they are actually receiving any inputs from the mappers. If these are lopsided, then it's probably the intermediate key being produced.

Hi @james.baiera ,

thanks a lot for your answer!
So I am calling saveToEs on a RDD[Map[String, Any]], does that mean all the data has the same key?
So mapping the entries as Tuple(id, valueMap) and calling saveToEsWithMeta should help, right?
I am still trying to figure out how this should work...

Thanks,

@larghir The situation with keys being shuffled to one reducer is primarily a MapReduce case. A Spark RDD will write out to Elasticsearch in parallel using which ever number of partitions are configured. Writing parallelism does also depend on your RDD layout, your configuration, and the available resources in your environment.