Ingesting from Hive to ElasticSearch


I am using a scala program to load an index in Hive to Elastic search. I run this job as a spark-submit command with options --driver-memory 12000m --driver-cores 15 --num-executors 20. There are 60000000 records in the Hive table( around 200GB). The job has been running for more than 24 hrs now, with only this docs loaded

size: 26.1Gi (52.2Gi)
docs: 68,178,464 (136,351,928)

Any ways to improve this ingestions process? The index has a replication factor 2.

Scala program to ingest to ES:

val conf = new SparkConf().setAppName("FASApplication").setMaster("local");

val sc = new SparkContext(conf);
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
//val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val dataFrame = sqlContext.sql("select columns from np_fas.table");

val elasticOptions = Map("es.nodes" -> "","es.port" -> "9200","" -> "no", "es.nodes.wan.only" -> "true" ,"" -> "admin","" -> "Hadop123 ");

val dataFrameLayout =  dataFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Append).options(elasticOptions).save("fas_reporting_device_data_oct_2016/fas_type_family")

If you are not running in a cloud environment, that I might suggest disabling wan only mode. This makes your jobs write only to the provided node addresses, which in this case is only one address.

Thanks James for your reply. I disabled wan only mode. And I changed below code as well:

val conf = new SparkConf().setAppName("FASApplication").setMaster("local");


val conf = new SparkConf().setAppName("FASApplication");

However the problem was with the spark-submit command. I did not have --master yarn --deploy-mode cluster. After adding this options my job was shared by 3 cluster nodes.

@vsdeepthi Ah, I skipped right over the local mode option. Good to hear it's working now. Cheers!

