Read ES Index from Spark Executors

(Nirav Vira) #1

ES: 2.2.0
Spark: 1.6
Scala: 2.10
ElasticHadoop: elasticsearch-hadoop 2.2.0 ( i can use elasticsearch-spark-2.10 if needed)
Kafka: 0.9.0
Host: CDH5.7 VM
Spark Streaming Job Desc: Read data from Kafka topic. for unique Id, read ES index/type data from this correlation, aggregate & store in another index/type for search.

SparkContext is created on driver along with ES settings.

able to read ES fine as long as I do .collect & send the execution to Spark Driver.

Question: How to make the read from Spark Executor so I can leverage the parallelism. Googling suggests use of ConnectionPool at rdd partition level for connection to likes of Cassandra, etc.

How to get a connection pool going for ES, any concrete impl / documentation for connection pool that can be used on Executor.

code snippet

val stream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messageHandler)
  .foreachRDD { rdd =>
    if (!rdd.isEmpty) {
      /* //this works fine so connectivity from driver to ES is good
      val myrdd = sc.esRDD("somindex/sometype ", "?q=EntityId:993b0000-e516-6c3b-79a9-08d349e3fd34")
      myrdd.foreach(x => println("item details" + x._2))
      rdd.collect().foreach(item => {
        ... some operation
        val entityId = kafkaMessage.entityId //kafkaMessage resolved thru code not shown here
        val esDataRDD = sc.esRDD("someindex/sometype", "?q=EntityId:"+entityId)

(Nicolas Phung) #2


I'm interested too if this best practices can be applied with elasticsearch-spark. It looks like we are running into some troubles with several Spark Streaming to write in Elasticsearch with elasticsearch-spark 2.3.1 :

code snippet

This happens when we have many Spark Streaming jobs and it seems to impact existing jobs that uses elasticsearch-spark to write into ElasticSearch. Does someone has any idea ?


(Costin Leau) #3

Streaming is a special case since conceptually it's a long running process but Spark is actually batching so the whole micro-batching strategy means a series of small batch process/tasks which keep on writing to ES.
As there's no proper API for that, the Spark docs suggests to keep the connections alive and pass them to the batch in order to keep creating them.
Which works if there's a programmatic approach (open, connection, write some data, check, close) but fails with a declarative one (take this data and save it).
Spark 2 looks to be introducing/changing some APIs and once it is finalized, we'll look into providing hooks for reusing connections.

(Nirav Vira) #4

Costin, Thanks for your reply. My use case is about "read" ES as well. I am wondering if there are current options (w or w/o connection pool) to call .esRDD without collect & there by running the code on spark executors. Any samples are greatly appreciated.

(system) #6