Writing spark Dataframe/Dataset to Elasticsearch


(Phani Kumar Yadavilli) #1

I am trying to write a JavaRDD to elasticsearch using the saveToES() method. But, we are getting the exception

EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'

Code:

public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.set("es.nodes", "cstg-01")
				.set("es.port", "9200")
				.set("es.scheme", "http")
				.set("spark.sql.wareh0use.dir", "/app/SmartAnalytics")
				.set("spark.serializer",
						"org.apache.spark.serializer.KryoSerializer");
		SparkSession spark = SparkSession.builder().appName("Spark Job")
				.config(conf).enableHiveSupport().getOrCreate();
		// spark.sql("SELECT * FROM sora_feed.sora_sr_details_temp").show(10);
		/*
		 * 
		 Dataset<Row> sqlDF = spark
				.sql("Select * from service_request_transformed.sr_bug_detail_text");
		sqlDF.printSchema();

		Dataset<Row> modifiedSR = sqlDF.
				select(col("*"),struct("incident_number")
						.as("srdetails"))
						 .groupBy("identifier","severity","project","product","original_version","os_version","version_text","os_type","headline","description","project_release_text","integrated_releases_text","known_fixed_release_text","status","duplicate_of"								,"verified_release_text","apply_to_text","to_be_fixed_text","found","submitted_on","last_mod_on","failed_release_text").agg(collect_list(struct("srdetails"))).coalesce(1);
		modifiedSR.write().format("json")
				.save("/app/SmartAnalytics/Apps/TestData/Bug_2_SR");*/
		
		
		Dataset<Row> sqlDF = spark
				.sql("Select * from service_request_transformed.sr_denorm_defects");
		sqlDF.printSchema();
		Dataset<Row> modifiedSR = sqlDF.
				select(col("*"),struct("incident_number")
						.as("srdetails"))
						 .groupBy("defect_number","defect_title","defect_submitted_on").agg(collect_list(struct("srdetails"))).coalesce(1);
		modifiedSR.write().format("json")
				.save("/app/SmartAnalytics/Apps/TestData/Defects_2_SR");

		modifiedSR.printSchema();

		JavaRDD<Row> srRDD = modifiedSR.toJavaRDD();
		
		
		JavaEsSpark.saveToEs(srRDD, "/sr2bug/data" , ImmutableMap.of("es.mapping.id" , "defect_number"));//indexpath 
		
		/*
		 * Map<String, String> params = Collections.emptyMap(); HttpEntity
		 * entity = new NStringEntity(inputJson, ContentType.APPLICATION_JSON);
		 * try { Response response = context.getContext().performRequest("POST",
		 * es_index + documentID.toString(), params, entity);
		 * logger.info("The json record is successfully indexed :: "
		 * +response.getEntity()); } catch (Exception e) { e.printStackTrace();
		 * }
		 */

	}

(James Baiera) #2

Do you have any more logs for that failure? This failure can have any number of reasons for occurring, ranging from a lack of route to the ES hosts, or errors in the local network layer (too many connections, incorrect SSL settings).


(system) #3

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.