Ingesting from Hive to ElasticSearch

Hi,

I am using a scala program to load an index in Hive to Elastic search. I run this job as a spark-submit command with options --driver-memory 12000m --driver-cores 15 --num-executors 20. There are 60000000 records in the Hive table( around 200GB). The job has been running for more than 24 hrs now, with only this docs loaded

size: 26.1Gi (52.2Gi)
docs: 68,178,464 (136,351,928)

Any ways to improve this ingestions process? The index has a replication factor 2.

Scala program to ingest to ES:

val conf = new SparkConf().setAppName("FASApplication").setMaster("local");

val sc = new SparkContext(conf);
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
//val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val dataFrame = sqlContext.sql("select columns from np_fas.table");

val elasticOptions = Map("es.nodes" -> "171.68.203.75","es.port" -> "9200","es.index.auto.create" -> "no", "es.nodes.wan.only" -> "true" ,"es.net.http.auth.user" -> "admin","es.net.http.auth.pass" -> "Hadop123 ");

val dataFrameLayout =  dataFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Append).options(elasticOptions).save("fas_reporting_device_data_oct_2016/fas_type_family")

If you are not running in a cloud environment, that I might suggest disabling wan only mode. This makes your jobs write only to the provided node addresses, which in this case is only one address.

Thanks James for your reply. I disabled wan only mode. And I changed below code as well:

val conf = new SparkConf().setAppName("FASApplication").setMaster("local");

to

val conf = new SparkConf().setAppName("FASApplication");

However the problem was with the spark-submit command. I did not have --master yarn --deploy-mode cluster. After adding this options my job was shared by 3 cluster nodes.

@vsdeepthi Ah, I skipped right over the local mode option. Good to hear it's working now. Cheers!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.