How to parallelize ES load operation in Spark using the connector lib?

I have a three node Spark cluster (8 cores and 16GB RAM each) in standalone mode. I am using Elasticsearch-hadoop connector to read an ES index. The index is really huge with over 100M documents and has 5 shards and 2 replications. When I create a dataframe using Spark, I want this load operation to be parallelized but I see that it is handled only in driver and not on the executors. This single operation alone takes over 8 hours to load. How can I optimize this and let all the worker nodes to load the data in parallel?

I submit the job using 16 executors each with 1 core and 2GB memory and driver with 4GB memory.

df = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes", es_ip).load(es_index)

I've heard reports of this issue from multiple users but have yet to pin down the actual problem. Could you include the logs from a run as well as a few thread dumps once the job is running to help us see what kind of rut Spark might be getting itself into?

Thanks @james.baiera for looking into it. I'm at the cusp of moving on to another solution but if this works, it would be the best one.

Here is the stdout of the typical worker process:

2019-04-08 15:17:50 INFO TransportClientFactory:267 - Successfully created connection to node-master/172.30.56.201:41669 after 183 ms (0 ms spent in bootstraps)
2019-04-08 15:17:50 INFO SecurityManager:54 - Changing view acls to: root,hadoop
2019-04-08 15:17:50 INFO SecurityManager:54 - Changing modify acls to: root,hadoop
2019-04-08 15:17:50 INFO SecurityManager:54 - Changing view acls groups to:
2019-04-08 15:17:50 INFO SecurityManager:54 - Changing modify acls groups to:
2019-04-08 15:17:50 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, hadoop); groups with view permissions: Set(); users with modify permissions: Set(root, hadoop); groups with modify permissions: Set()
2019-04-08 15:17:51 INFO TransportClientFactory:267 - Successfully created connection to node-master/172.30.56.201:41669 after 9 ms (0 ms spent in bootstraps)
2019-04-08 15:17:51 INFO DiskBlockManager:54 - Created local directory at /tmp/spark-f9e343e2-6cc4-418d-b999-b2716d784907/executor-9cfe62a3-cdf0-4e46-b6ac-b677a3f7ee7a/blockmgr-490abfa7-2578-4095-8366-74b291fa64c9
2019-04-08 15:17:51 INFO MemoryStore:54 - MemoryStore started with capacity 1048.8 MB
2019-04-08 15:17:51 INFO CoarseGrainedExecutorBackend:54 - Connecting to driver: spark://CoarseGrainedScheduler@node-master:41669
2019-04-08 15:17:51 INFO WorkerWatcher:54 - Connecting to worker spark://Worker@172.30.57.114:35185
2019-04-08 15:17:51 INFO WorkerWatcher:54 - Successfully connected to spark://Worker@172.30.57.114:35185
2019-04-08 15:17:51 INFO TransportClientFactory:267 - Successfully created connection to /172.30.57.114:35185 after 19 ms (0 ms spent in bootstraps)
2019-04-08 15:17:52 INFO CoarseGrainedExecutorBackend:54 - Successfully registered with driver
2019-04-08 15:17:52 INFO Executor:54 - Starting executor ID 9 on host 172.30.57.114
2019-04-08 15:17:52 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39155.
2019-04-08 15:17:52 INFO NettyBlockTransferService:54 - Server created on 172.30.57.114:39155
2019-04-08 15:17:52 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2019-04-08 15:17:52 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(9, 172.30.57.114, 39155, None)
2019-04-08 15:17:52 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(9, 172.30.57.114, 39155, None)
2019-04-08 15:17:52 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(9, 172.30.57.114, 39155, None)
2019-04-08 15:18:05 INFO CoarseGrainedExecutorBackend:54 - Got assigned task 12
2019-04-08 15:18:05 INFO Executor:54 - Running task 12.0 in stage 0.0 (TID 12)
2019-04-08 15:18:05 INFO Executor:54 - Fetching spark://node-master:41669/jars/elasticsearch-hadoop-6.6.2.jar with timestamp 1554761864368
2019-04-08 15:18:05 INFO TransportClientFactory:267 - Successfully created connection to node-master/172.30.56.201:41669 after 13 ms (0 ms spent in bootstraps)
2019-04-08 15:18:05 INFO Utils:54 - Fetching spark://node-master:41669/jars/elasticsearch-hadoop-6.6.2.jar to /tmp/spark-f9e343e2-6cc4-418d-b999-b2716d784907/executor-9cfe62a3-cdf0-4e46-b6ac-b677a3f7ee7a/spark-579a5436-c268-4b16-a28d-f04a9b1a02ca/fetchFileTemp2866888904293738116.tmp
2019-04-08 15:18:05 INFO Utils:54 - Copying /tmp/spark-f9e343e2-6cc4-418d-b999-b2716d784907/executor-9cfe62a3-cdf0-4e46-b6ac-b677a3f7ee7a/spark-579a5436-c268-4b16-a28d-f04a9b1a02ca/7366662891554761864368_cache to /usr/local/spark-2.4.0-bin-hadoop2.7/work/app-20190408151744-0023/9/./elasticsearch-hadoop-6.6.2.jar
2019-04-08 15:18:05 INFO Executor:54 - Adding file:/usr/local/spark-2.4.0-bin-hadoop2.7/work/app-20190408151744-0023/9/./elasticsearch-hadoop-6.6.2.jar to class loader
2019-04-08 15:18:05 INFO TorrentBroadcast:54 - Started reading broadcast variable 0
2019-04-08 15:18:06 INFO TransportClientFactory:267 - Successfully created connection to /172.30.57.114:39175 after 21 ms (0 ms spent in bootstraps)
2019-04-08 15:18:06 INFO MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 274.4 KB, free 1048.5 MB)
2019-04-08 15:18:06 INFO TorrentBroadcast:54 - Reading broadcast variable 0 took 531 ms
2019-04-08 15:18:06 INFO MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 840.5 KB, free 1047.7 MB)
2019-04-08 15:18:07 INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1
2019-04-08 15:18:09 INFO SQLHadoopMapReduceCommitProtocol:54 - Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
2019-04-08 15:18:09 INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1
2019-04-08 15:18:09 INFO SQLHadoopMapReduceCommitProtocol:54 - Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
2019-04-08 15:18:20 INFO CodeGenerator:54 - Code generated in 7786.891257 ms
2019-04-08 15:18:21 INFO CodecConfig:95 - Compression: GZIP
2019-04-08 15:18:21 INFO CodecConfig:95 - Compression: GZIP
2019-04-08 15:18:22 INFO ParquetOutputFormat:376 - Parquet block size to 134217728
2019-04-08 15:18:22 INFO ParquetOutputFormat:377 - Parquet page size to 1048576
2019-04-08 15:18:22 INFO ParquetOutputFormat:378 - Parquet dictionary page size to 1048576
2019-04-08 15:18:22 INFO ParquetOutputFormat:379 - Dictionary is on
2019-04-08 15:18:22 INFO ParquetOutputFormat:380 - Validation is off
2019-04-08 15:18:22 INFO ParquetOutputFormat:381 - Writer version is: PARQUET_1_0
2019-04-08 15:18:22 INFO ParquetOutputFormat:382 - Maximum row group padding size is 8388608 bytes
2019-04-08 15:18:22 INFO ParquetOutputFormat:383 - Page size checking is: estimated
2019-04-08 15:18:22 INFO ParquetOutputFormat:384 - Min row count for page size check is: 100
2019-04-08 15:18:22 INFO ParquetOutputFormat:385 - Max row count for page size check is: 10000
2019-04-08 15:18:23 INFO ParquetWriteSupport:54 - Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "hxAgentLastAudit",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
}, {

<...snip...>

optional int64 wtpSessionUpTime;
optional binary xmlBody (UTF8);
optional int64 xssViol;
}

2019-04-08 15:18:24 INFO CodecPool:153 - Got brand-new compressor [.gz]

I am posting the logs into multiple messages as it hit the limit of message size:

And the Running Query:

parquet at NativeMethodAccessorImpl.java:0+details

org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557) java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.base/java.lang.reflect.Method.invoke(Method.java:564) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) py4j.Gateway.invoke(Gateway.java:282) py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) py4j.commands.CallCommand.execute(CallCommand.java:79) py4j.GatewayConnection.run(GatewayConnection.java:238) java.base/java.lang.Thread.run(Thread.java:844)

Details of Query 0:

> == Parsed Logical Plan ==
> InsertIntoHadoopFsRelationCommand hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520, false, Parquet, Map(compression -> gzip, path -> hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520), ErrorIfExists, [hxAgentLastAudit, ,apSysSipStatsActiveSubscriptions, ... 2620 more fields]
> +- Repartition 16, false
>    +- Relation[hxAgentLastAudit#0,lastStep#1, ,apSysSipStatsActiveSubscriptions#98L,... 2620 more fields] ElasticsearchRelation(Map(es.nodes -> localhost:9200, es.nodes.resolve.hostname -> false, es.resource -> event-2019.03.24),org.apache.spark.sql.SQLContext@4ff5e0e8,None)
> 
> == Analyzed Logical Plan ==
> InsertIntoHadoopFsRelationCommand hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520, false, Parquet, Map(compression -> gzip, path -> hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520), ErrorIfExists, [hxAgentLastAudit,  ,apSysSipStatsActiveSubscriptions, ... 2620 more fields]
> +- Repartition 16, false
>    +- Relation[hxAgentLastAudit#0,lastStep#1, ,apSysSipStatsActiveSubscriptions#98L,... 2620 more fields] ElasticsearchRelation(Map(es.nodes -> localhost:9200, es.nodes.resolve.hostname -> false, es.resource -> event-2019.03.24),org.apache.spark.sql.SQLContext@4ff5e0e8,None)
> 
> == Optimized Logical Plan ==
> InsertIntoHadoopFsRelationCommand hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520, false, Parquet, Map(compression -> gzip, path -> hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520), ErrorIfExists, [hxAgentLastAudit, , apSysSipStatsActiveSubscriptions, ... 2620 more fields]
> +- Repartition 16, false
>    +- Relation[hxAgentLastAudit#0,lastStep#1, ,apSysSipStatsActiveSubscriptions#98L,... 2620 more fields] ElasticsearchRelation(Map(es.nodes -> localhost:9200, es.nodes.resolve.hostname -> false, es.resource -> event-2019.03.24),org.apache.spark.sql.SQLContext@4ff5e0e8,None)
> 
> == Physical Plan ==
> Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520, false, Parquet, Map(compression -> gzip, path -> hdfs://node-master:9000/events/1/2019/03/24.parquet.0408_1520), ErrorIfExists, [hxAgentLastAudit, , apSysSipStatsActiveSubscriptions, ... 2620 more fields]
> +- Coalesce 16
>    +- Scan ElasticsearchRelation(Map(es.nodes -> localhost:9200, es.nodes.resolve.hostname -> false, es.resource -> event-2019.03.24),org.apache.spark.sql.SQLContext@4ff5e0e8,None) [hxAgentLastAudit#0,lastStep#1, ,apSysSipStatsActiveSubscriptions#98L,... 2620 more fields] PushedFilters: [], ReadSchema: struct<hxAgentLastAudit:timestamp,lastStep:string,*:string,ARTClientBytes:bigint,ARTClientPackets...

Below is the thread dump without details . I have the full dump but can't post it here. Let me know how I can attach the file or send it to you in another way...

|Thread ID|Thread Name|Thread State|Thread Locks|
| --- | --- | --- | --- |
|45|Executor task launch worker for task 11|RUNNABLE|Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1861416618})|
|11|Common-Cleaner|TIMED_WAITING||
|33|dispatcher-event-loop-0|WAITING|Lock(java.util.concurrent.ThreadPoolExecutor$Worker@642538759})|
|34|dispatcher-event-loop-1|RUNNABLE|Lock(java.util.concurrent.ThreadPoolExecutor$Worker@138933223})|
|42|driver-heartbeater|TIMED_WAITING||
|46|files-client-6-1|RUNNABLE|Monitor(io.netty.channel.nio.SelectedSelectionKeySet@174833404}), Monitor(java.util.Collections$UnmodifiableSet@1999280936}), Monitor(sun.nio.ch.EPollSelectorImpl@1854478483})|
|3|Finalizer|WAITING||
|148|IPC Client (586679191) connection to node-master/172.30.56.201:9000 from hadoop|TIMED_WAITING||
|51|IPC Parameter Sending Thread #0|TIMED_WAITING||
|53|LeaseRenewer:hadoop@node-master:9000|TIMED_WAITING||
|1|main|TIMED_WAITING||
|36|netty-rpc-env-timeout|TIMED_WAITING||
|49|org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner|WAITING||
|2|Reference Handler|RUNNABLE||
|39|RemoteBlock-temp-file-clean-thread|TIMED_WAITING||
|37|rpc-client-3-1|RUNNABLE||
|48|shuffle-client-4-1|RUNNABLE|Monitor(io.netty.channel.nio.SelectedSelectionKeySet@141324612}), Monitor(java.util.Collections$UnmodifiableSet@1596437227}), Monitor(sun.nio.ch.EPollSelectorImpl@2088206844})|
|41|shuffle-server-5-1|RUNNABLE|Monitor(io.netty.channel.nio.SelectedSelectionKeySet@454428111}), Monitor(java.util.Collections$UnmodifiableSet@960004241}), Monitor(sun.nio.ch.EPollSelectorImpl@381808505})|
|4|Signal Dispatcher|RUNNABLE||
|52|Thread-6|TIMED_WAITING||

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.