Hi,
I am trying to write a PySpark RDD into Elasticsearch, however I am getting the GregorianCalendar error. Any suggestions or help would be appreciated.
es_fault_df = sqlContext.sql("select * from fault_search_all limit 10")
es_fault_rdd = es_fault_df.rdd.map(lambda item: ('key', {'objid': item['objid'],'customer_name': item['customer_name'] ,'veh_hdr':item['veh_hdr'],'road_number':item['road_number'],'fleet_name':item['fleet_name'],'model_desc':item['model_desc'],'fault_code':item['fault_code'],'sub_id':item['sub_id'],'fault_desc':item['fault_desc'],'occur_date':item['occur_date'],'fault_reset_date':item['fault_reset_date'],'fault_origin':item['fault_origin'],'record_type':item['record_type'],'gps_latitude':item['gps_latitude'],'gps_longitude':item['gps_longitude'],'offboard_load_date':item['offboard_load_date'],'loco_speed':item['loco_speed'],'engine_speed':item['engine_speed'],'notch':item['notch'],'direction':item['direction'],'hp':item['hp'],'water_temp':item['water_temp'],'oil_temp':item['oil_temp'],'mode_call':item['mode_call'],'loco_state_desc':item['loco_state_desc'],'software_subid':item['software_subid']}))
es_fault_rdd.saveAsNewAPIHadoopFile(path='-',outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
Logs:
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:178)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:89)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:88)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: org.apache.spark.SparkException: Data of type java.util.GregorianCalendar cannot be used
at org.apache.spark.api.python.JavaToWritableConverter.org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable(PythonHadoopUtil.scala:141)
at org.apache.spark.api.python.JavaToWritableConverter$$anonfun$org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable$1.apply(PythonHadoopUtil.scala:134)
at org.apache.spark.api.python.JavaToWritableConverter$$anonfun$org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable$1.apply(PythonHadoopUtil.scala:133)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.JavaToWritableConverter.org$apache$spark$api$python$JavaToWritableConverter$$convertToWritable(PythonHadoopUtil.scala:133)
at org.apache.spark.api.python.JavaToWritableConverter.convert(PythonHadoopUtil.scala:148)
at org.apache.spark.api.python.JavaToWritableConverter.convert(PythonHadoopUtil.scala:115)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:181)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:181)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:147)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:144)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:159)
... 8 more