Using es-hadoop with Zeppelin and the use of es.nodes

Hi,

Following a conversation on a pull request for Zeppelin, @costin asked for a thread here to better clarify.
(https://github.com/apache/zeppelin/pull/1970)

I'm trying to use es-hadoop with Zeppelin, I added the jar properly, but when I got to define es.nodes in Zeppelin's Spark interpreter settings, it didn't seem to work.
The reason turned out to be that Zeppelin has (recently) made a change, to not propagate any configuration value that doesn't start with "spark." into the SparkConf.
The reasoning was that although you can pass such configuration values into SparkConf via code upon initialization, that Spark allegedly does not propagate non-spark configuration values into the executors anyway.(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L372)
Bottom line, starting Zeppelin 0.7.1, there is no way to use es-hadoop.

The question then is: how does es-hadoop use the es.nodes value, and does or does it not transfer that values to all nodes? What I'd like to figure out is if es.hadoop is misusing SparkConf by expecting a non-spark.* value in it, or is this all a needless restriction on the Zeppelin side?
Your help is greatly appreciated.

I am curious to know whether anyone has made es-hadoop work on spark and zeppelin. If yes, please specify the spark & zeppelin version. Because I think it should never work as spark won't propagate non spark.* configuration to executors.

I have been successful (calling Dataset.saveToEs()) using Zeppelin 0.7.1 by replacing the interpreter jar to version 0.7.0. I'm using Spark 2.1.1.
Once the es.nodes appears in sc.getConf it all works fine.

Do you have sample code for that ?

import org.elasticsearch.spark.sql._
val mydate = "2017-05-28"
val df1 = spark.read.parquet("s3://mybucket/ingestDate=" + mydate)
// some transformations are done here, like filters, joins etc.
df1.saveToEs("myindex/mytype")

It's that simple. Just add elasticsearch-hadoop-5.4.1.jar to the dependencies, and set "es.nodes" to your elasticsearch server:port. If it's in AWS, make sure to also add the configuration es.nodes.wan.only=true

You'll see an immediate difference even without an es server between 0.7.0 and 0.7.1 - since in 0.7.1 it will complain it can't connect to 127.0.0.1, regardless of what you write in "es.nodes", but in 0.7.0 it will attempt to connect to the specified address.

Hello everyone, let me take a stab at clarifying some of these questions:

ES-Hadoop uses this setting as the originally configured set of nodes to communicate with. These nodes are used to discover the full set of addresses that the cluster's nodes reside at. If node discovery is disabled, then the addresses in this property will be used as the only communication point with Elasticsearch.

ES-Hadoop sends it's configs to the spark cluster as a string field on a writer object, leveraging the serializable trait. We pull the configurations on the driver side and save them as a string. We do not rely on the settings that Spark transmits to the workers, all of the state we need is packaged with the serialized writer class that is called with runJob.

Spark generally only likes dealing with settings that start with the spark. prefix. A snippet from our documentation highlights this fact:

Command-line: For those that want to set the properties through the command-line (either directly or by loading them from a file), note that Spark only accepts those that start with the "spark." prefix and will ignore the rest (and depending on the version a warning might be thrown). To work around this limitation, define the elasticsearch-hadoop properties by appending the spark. prefix (thus they become spark.es.) and elasticsearch-hadoop will automatically resolve them:

@orenpai If you set spark.es.nodes instead of es.nodes, does the job successfully kick off?

Thank you! That indeed did the trick.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.