i use pyspark read a parquet file from hdfs and i hive a column type is timstamp , when i write the DataFrame to es , there is an exception Data of type java.util.GregorianCalendar cannot be used but when i not use the timestamp column , it works fine.
spark = SparkSession.builder.appName(" python es ") .config("spark.master", "yarn") .getOrCreate()
path = "/data/file/test.parquet"
df = spark.read.load(path, format='parquet')
esClient = ESClient(index=db_name, type=table_name, id_field=id_filed)
esClient.writeDF2ES(df)
es_conf={
'es.mapping.id': 'xxxxx'
'es.nodes': 'xxxxx'
'es.resource': 'xxxxx'
}
def writeRDD2ES(rdd):
try:
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
except:
self.logger.error("Failed to write es(%s)" % self)
raise
Caused by: org.apache.spark.SparkException: Data of type java.util.GregorianCalendar cannot be used
at org.apache.spark.api.python.JavaToWritableConverter.org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable(PythonHadoopUtil.scala:141)
at org.apache.spark.api.python.JavaToWritableConverter$$anonfun$org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable$1.apply(PythonHadoopUtil.scala:134)
at org.apache.spark.api.python.JavaToWritableConverter$$anonfun$org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable$1.apply(PythonHadoopUtil.scala:133)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.JavaToWritableConverter.org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable(PythonHadoopUtil.scala:133)
at org.apache.spark.api.python.JavaToWritableConverter.convert(PythonHadoopUtil.scala:148)
at org.apache.spark.api.python.JavaToWritableConverter.convert(PythonHadoopUtil.scala:115)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:181)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:181)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1124)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)