I am trying to write a JavaRDD to elasticsearch using the saveToES() method. But, we are getting the exception
EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
Code:
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.set("es.nodes", "cstg-01")
.set("es.port", "9200")
.set("es.scheme", "http")
.set("spark.sql.wareh0use.dir", "/app/SmartAnalytics")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
SparkSession spark = SparkSession.builder().appName("Spark Job")
.config(conf).enableHiveSupport().getOrCreate();
// spark.sql("SELECT * FROM sora_feed.sora_sr_details_temp").show(10);
/*
*
Dataset<Row> sqlDF = spark
.sql("Select * from service_request_transformed.sr_bug_detail_text");
sqlDF.printSchema();
Dataset<Row> modifiedSR = sqlDF.
select(col("*"),struct("incident_number")
.as("srdetails"))
.groupBy("identifier","severity","project","product","original_version","os_version","version_text","os_type","headline","description","project_release_text","integrated_releases_text","known_fixed_release_text","status","duplicate_of" ,"verified_release_text","apply_to_text","to_be_fixed_text","found","submitted_on","last_mod_on","failed_release_text").agg(collect_list(struct("srdetails"))).coalesce(1);
modifiedSR.write().format("json")
.save("/app/SmartAnalytics/Apps/TestData/Bug_2_SR");*/
Dataset<Row> sqlDF = spark
.sql("Select * from service_request_transformed.sr_denorm_defects");
sqlDF.printSchema();
Dataset<Row> modifiedSR = sqlDF.
select(col("*"),struct("incident_number")
.as("srdetails"))
.groupBy("defect_number","defect_title","defect_submitted_on").agg(collect_list(struct("srdetails"))).coalesce(1);
modifiedSR.write().format("json")
.save("/app/SmartAnalytics/Apps/TestData/Defects_2_SR");
modifiedSR.printSchema();
JavaRDD<Row> srRDD = modifiedSR.toJavaRDD();
JavaEsSpark.saveToEs(srRDD, "/sr2bug/data" , ImmutableMap.of("es.mapping.id" , "defect_number"));//indexpath
/*
* Map<String, String> params = Collections.emptyMap(); HttpEntity
* entity = new NStringEntity(inputJson, ContentType.APPLICATION_JSON);
* try { Response response = context.getContext().performRequest("POST",
* es_index + documentID.toString(), params, entity);
* logger.info("The json record is successfully indexed :: "
* +response.getEntity()); } catch (Exception e) { e.printStackTrace();
* }
*/
}