Difference between task creation for a write and read-update-write operation in ES


(Piyush Goyal) #1

As asked by Costin, here is the questions and related answer set related to the discussion:

Question: I saw a different behavior of creating task for write to ES
operation while working on my project. The difference is as
follows:

    1.) Only write to ES - When I create an RDD of my own to insert
    data into ES, the task are created based on property
    "es.batch.size.bytes" and "es.batch.size.entries". Number of
    task created = Number of documents in RDD/the value of either of
    these properties. The request hits the node and node decides the
    shard to which document needs routed based on routing value(if
    specified).

Answer: What makes you say that? In case of writing, the number of writers
is determined by the number of shards of your target index. The more
shards, the more concurrent writers. The behavior of all writers can
be further tweaked through the properties mentioned however they do
NOT affect the process parallelism.

Question: Read-Update-write to ES - Consider this case when I have to
read data from ES, store it in RDD, do some updates in the
documents in RDD and then index these documents back to ES.
While reading, the number of tasks are created on basis of
number of shards and I presume that each tasks fetch data from
each Shard(not sure of how it works? - Task delagting request to
node to serve data from a particular shard?). Now when I try to
update/re-index data using same RDD and function
saveToESWithMetadata, this time the number of task created is a
number which is not based on point 1. If the data in each
partition is less than property "es.batch.size.entries", it
creates the same number of tasks as are the number of shards,
else greater than it.

Answer: In case of reading, the number of tasks that can work in parallel is
determine by the source parallelism - its number of partitions. So
if you have an RDD with 1 partition likely it will result into one
task that will write to another RDD down the line. Assuming that RDD
is backed by Elastic - even if the index has 10 shards and thus can
have a parallelism of 10, if the source has only one partition and
there's only one task, there's nothing the connector can do to
increase this number.

Again, remember that the connector is not an active component per se
- rather it bridges two systems. It's spark and more importantly the
RDDs structures that control the number of tasks/threads that can
work in parallel at a given time.

(Piyush Goyal) #2

Hi Costin,

Thanks for all the answers. So here are another list of questions for you. :smile:

If I am constructing a set of new documents and then indexing them to a index of 32 shards, would ES-Hadoop first identify that there are 32 shards, hence divide RDD into 32 partitions or initially it creates partitions on its own.

I tried a simple insertion program where in I was trying to put 5 documents to ES in a single type on an index of 32 shards. The number of cores used were 4 and program was run on a local machine. Surprisingly the number of tasks/partition created were just 4. When I made it to run on 2 core, the number of partitions/tasks spawned were 2. Is there any way by which ES-Hadoop can identify, yes the target index has 32 shards - I will also create 32 partitions and divide documents among them?

Regarding reading, you say:
"In case of reading, the number of tasks that can work in parallel is
determine by the source parallelism - its number of partitions. So
if you have an RDD with 1 partition likely it will result into one
task that will write to another RDD down the line. Assuming that RDD
is backed by Elastic - even if the index has 10 shards and thus can
have a parallelism of 10, if the source has only one partition and
there's only one task, there's nothing the connector can do to
increase this number."

Does this mean that if the source is Elasticsearch, then it creates: number of shards = number of partitions. I understand that in normal spark, user has to provide the number of partitions. What happens in case of reading from ES? As per the sample code, it creates 32 partitions by extracting number of shards from ES.

Thanks


(Costin Leau) #3

The source partition (whether it's Elastic or something else) is just a hint for the runtime - in the end it's the hardware and the scheduler that decides things. For example, you can put ES aside and try to control the number of partitions using a different data source, one locally to Spark to get a better understanding of how it works.
As you noted, even if the source is X, it will be limited by the number of cores that can run in parallel - Spark has some options to potentially create more tasks/threads for a core but really this doesn't help since even if you end up with X tasks, only C cores (2 or 4) can be ran at once so the rest are simply serialized.
In this case you likely need an actual cluster or a machine with significant more core to actually benefit from having multiple machines being able to run code at the same time.

I'm not sure what you mean by 'normal' Spark - each RDD can specify its number of partitions, which again, is really a hint, not a mandatory requirement. In case of Elasticsearch, the partition is dictated to the number of shards - whether Spark then creates the same number of tasks or not is completely independent of ES.
In fact, if you pay attention to the logging (and increase it), you will notice that for each shard, a partition is created and later on consumed by a spark task.


(system) #4