Writing spark Dataframe/Dataset to Elasticsearch

I am trying to write a JavaRDD to elasticsearch using the saveToES() method. But, we are getting the exception

EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'

Code:

public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.set("es.nodes", "cstg-01")
				.set("es.port", "9200")
				.set("es.scheme", "http")
				.set("spark.sql.wareh0use.dir", "/app/SmartAnalytics")
				.set("spark.serializer",
						"org.apache.spark.serializer.KryoSerializer");
		SparkSession spark = SparkSession.builder().appName("Spark Job")
				.config(conf).enableHiveSupport().getOrCreate();
		// spark.sql("SELECT * FROM sora_feed.sora_sr_details_temp").show(10);
		/*
		 * 
		 Dataset<Row> sqlDF = spark
				.sql("Select * from service_request_transformed.sr_bug_detail_text");
		sqlDF.printSchema();

		Dataset<Row> modifiedSR = sqlDF.
				select(col("*"),struct("incident_number")
						.as("srdetails"))
						 .groupBy("identifier","severity","project","product","original_version","os_version","version_text","os_type","headline","description","project_release_text","integrated_releases_text","known_fixed_release_text","status","duplicate_of"								,"verified_release_text","apply_to_text","to_be_fixed_text","found","submitted_on","last_mod_on","failed_release_text").agg(collect_list(struct("srdetails"))).coalesce(1);
		modifiedSR.write().format("json")
				.save("/app/SmartAnalytics/Apps/TestData/Bug_2_SR");*/
		
		
		Dataset<Row> sqlDF = spark
				.sql("Select * from service_request_transformed.sr_denorm_defects");
		sqlDF.printSchema();
		Dataset<Row> modifiedSR = sqlDF.
				select(col("*"),struct("incident_number")
						.as("srdetails"))
						 .groupBy("defect_number","defect_title","defect_submitted_on").agg(collect_list(struct("srdetails"))).coalesce(1);
		modifiedSR.write().format("json")
				.save("/app/SmartAnalytics/Apps/TestData/Defects_2_SR");

		modifiedSR.printSchema();

		JavaRDD<Row> srRDD = modifiedSR.toJavaRDD();
		
		
		JavaEsSpark.saveToEs(srRDD, "/sr2bug/data" , ImmutableMap.of("es.mapping.id" , "defect_number"));//indexpath 
		
		/*
		 * Map<String, String> params = Collections.emptyMap(); HttpEntity
		 * entity = new NStringEntity(inputJson, ContentType.APPLICATION_JSON);
		 * try { Response response = context.getContext().performRequest("POST",
		 * es_index + documentID.toString(), params, entity);
		 * logger.info("The json record is successfully indexed :: "
		 * +response.getEntity()); } catch (Exception e) { e.printStackTrace();
		 * }
		 */

	}

Do you have any more logs for that failure? This failure can have any number of reasons for occurring, ranging from a lack of route to the ES hosts, or errors in the local network layer (too many connections, incorrect SSL settings).

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.