Hello Everyone,
Need help on below issue.
I am indexing a nested JSON through ES-Hadoop , but it is failing with the below error
org.apache.spark.util.TaskCompletionListenerException: Could not write all entries for bulk operation [1/1]. Error sample (first [5] error messages):
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
{"index":{}}
I am using below code to write the document into AWS-ES. In below code nonModifiedDataFrame always have only one record. documentpath have one Json file(size is around 100 MB) and I have to add indexid and indextimestamp as two columns. If I use sparkContext.textFile(documentpath) to create RDD[String] I am unable to add two more columns in the json , so I took dataframe approach and cleaning some information in Json y using Method replaceDotsWithUnderScore.
def indexThroughSpark(spark:SparkSession,indexid:String,documnetPath:String,indexName:String
,indextimestamp:String)={
import spark.implicits._
val nonModifiedDataFrame= spark.read.json(documnetPath)
.withColumn("indexid",lit(indexid))
.withColumn("indextimestamp",lit(indextimestamp))
val convertedString:RDD[String] = nonModifiedDataFrame.toJSON.rdd
val replacedString = convertedString.map{ line => ModifyKeysForDots().replaceDotsWithUnderScore(line)
}
val cfg =
Map(
("es.resource","indexfor_spark/_doc")
)
EsSpark.saveJsonToEs(replacedString,cfg)
}
If I execute the above code in my local environment i.e. master is local it is working fine and I am able to see the no of documents , but if I run the same code in cluster and master as Yarn it is failing .
Thanks In advance.