Hi Team,
We have requirement where, we need to load the parquet file from S3 bucket and ingest this data in to the AWS Elastic-Search using Spark (Scala) Job.
To Achieve this i have tried in my local system by installing Elastic Search in my local machine and i am able to ingest the data in to Elastic-Search (Elastic-Search which is running in local machine) using Spark-Scala job.
Local Machine:
Here is the working code to connect to the Local Elastic Search...
object ElasticSearchWriteSample {
def main(args: Array[String]) {
val sparkSession = SparkSession.builder().appName("WriteToElasticSearch").master("local[*]").getOrCreate()
sparkSession.conf.set("spark.sql.parquet.binaryAsString", "true")
val filePath = args(0) // Here i will get the file path through vargs
val dataFrame = sparkSession.read.parquet(filePath)
dataFrame.write
.format("org.elasticsearch.spark.sql")
.option("es.port", "9200")
.option("es.nodes", "localhost")
.mode(SaveMode.Append)
.save("tcrref_testdb/doc")
}
}
I am able to Push the data in to Elastic Search through the Spark Job in my local system to the local machine elastic-search.
In AWS:
Same thing i tried by running spark job in EMR which will connects the AWS-Elastic-Search Instance. But it is unable to connect to the Elastic Search which is running inn AWS instance.
Here the code...
object ElasticSearchWriteSample {
def main(args: Array[String]) {
val sparkSession = SparkSession.builder().appName("WriteToElasticSearch").master("cluster").getOrCreate()
sparkSession.conf.set("spark.sql.parquet.binaryAsString", "true")
val filePath = args(0) // Here i will get the file path through vargs
val dataFrame = sparkSession.read.parquet(filePath)
dataFrame.write
.format("org.elasticsearch.spark.sql")
.option("es.port", "9200")
.option("es.nodes", "https://*****.[amazonaws.com](http://amazonaws.com/)")
.option("es.nodes.wan.only", "true")
.mode(SaveMode.Append)
.save("tcrref_testdb/doc")
}
}
This AWS job is not working.
Could you please help us, how to connect to the AWS Elastic-Search instance from EMR.