Error when multiple calls to writeToES


#1

Hi
Using : elastic 5.4.0, spark-2.1.1-bin-hadoop2.7 and elasticsearch-hadoop-5.4.0 on MacOS, I need to write 3 Dataset in my ES.
I use the writeToES method, 3 times... If I write only 1 of the 3 Datasets, everything if fine. As far as I try to write 2 datasets, I get the following error (note that the first dataset is well wriitten in es).
I'm working locally on my laptop, same code and config work fine on others environments (linux).
If so has an idea where the issue is, it will great :slight_smile:
Thanks in advance
Pascal

2017-05-31 13:59:54 ERROR NetworkClient - Node [127.0.0.1:9200] failed (Connection reset); no other nodes left - aborting...

2017-05-31 13:59:54 ERROR Executor - Exception in task 20.0 in stage 3.0 (TID 9)
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:283)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:572)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:91)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:91)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.1:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:150)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:425)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:429)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
at org.elasticsearch.hadoop.rest.RestClient.remoteEsVersion(RestClient.java:627)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:276)
... 10 more
2017-05-31 13:59:54 ERROR TaskSetManager - Task 20 in stage 3.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 3.0 failed 1 times, most recent failure: Lost task 20.0 in stage 3.0 (TID 9, localhost, executor driver): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:283)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:572)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:91)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:91)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.1:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:150)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:425)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:429)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
at org.elasticsearch.hadoop.rest.RestClient.remoteEsVersion(RestClient.java:627)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:276)
... 10 more


(James Baiera) #2

Could you include the code that you are using to submit the jobs?


#3

Hi
A bit tricky, we have code in a lot a classes :-s
I try, hope it is enough.
Many thanks
Pascal

...
conf.set("es.nodes", "");
conf.set("es.port", esPort);
conf.set("es.index.auto.create", "true");
...
sparkContext = new JavaSparkContext(conf);
SparkSession sparkSession = sparkSQLContextHandler.getContext();

	Dataset<Row> callDs = CallService.javaRddToDataset(this.getCallJavaRdd(), Call.class);
	Dataset<Row> calleeDs = callDs.select("userid", "date_date_appel")
			.distinct()
			.withColumn("idEs", functions.concat_ws("_", callDs.col("date_date_appel"), callDs.col("userid")));

	CallService.writeToEs(callDs, callIndexAndType);
	CallService.writeToEs(calleeDs, calleeIndexAndType, "append", "idEs");

With

public void writeToEs(Dataset<Row> dataset, String esIndex) {
	String mode = "append";
	this.writeToEs(dataset, esIndex, mode);	
}

@Override
public void writeToEs(Dataset<Row> dataset, String esIndex, String mode, String id) {
	dataset.write()
	.format("org.elasticsearch.spark.sql")
	.option("es.index.auto.create", "true")
	.option("es.resource.auto.create",esIndex)
	.option("es.mapping.id", id)
	.mode(mode)
	.save(esIndex);
	
}

private void setCallJavaRdd() {
	@SuppressWarnings("unchecked")
	JavaRDD<Call> callRdd = (JavaRDD<Call>) CallService.getPojoRdd("call", "Call");
	this.callJavaRdd = callRdd;
	this.callJavaRdd.cache();
}

public Dataset<Row> javaRddToDataset(JavaRDD<?> dataJavaRdd, Class<?> clazz) {
	SparkSQLContextHandler sparkSQLContextHandler = (SparkSQLContextHandler) ApplicationFactory.getBean("sparkSQL");
	SparkSession sparkSession = sparkSQLContextHandler.getContext();
	Dataset<Row> dataset = sparkSession.createDataFrame(dataJavaRdd, clazz);
	return dataset;
}

(James Baiera) #4

I'm not sure I see where you are setting the nodes for the job, Are they being configured in a different part of the project?


#5

Hi
I guess it is part of context initiation:

	SparkConf conf = new SparkConf().setAppName("Spark App").setMaster("local[*]");

	this.propertyHandler = (PropertyHandler) ApplicationFactory.getBean("property");
	String esHost = propertyHandler.getValue("es.host");
	String esPort = propertyHandler.getValue("es.port");

	// Ajout de la conf ES au contexte Spark
	conf.set("es.nodes", esHost);
	conf.set("es.port", esPort);
	conf.set("es.index.auto.create", "true");

	// Création du contexte Spark
	sparkContext = new JavaSparkContext(conf);

esHost is equal to localhost in the property file.


(system) #6

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.