I have an hive interface to elasticsearch -
CREATE EXTERNAL TABLE IF NOT EXISTS analytics_metrics_workflow_es(
workflow_id string,
workflow_name string)
ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'xxxx','es.port' = '9200','es.resource' = 'analytics_metrics_workflow/data', 'es.index.auto.create' = 'true', 'es.mapping.id' = 'workflow_id');
I have another hive table as source -
CREATE EXTERNAL TABLE IF NOT EXISTS analytics_metrics_workflow(
workflow_id string,
workflow_name string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/sd/analytics_metrics_workflow_sd';
When I insert data from hive shell -
insert into analytics_metrics_workflow_es select workflow_id,workflow_name from analytics_metrics_workflow
. This works.
But when I use spark.sql("insert into analytics_metrics_workflow_es select workflow_id,workflow_name from analytics_metrics_workflow")
, it throws an error -
scala> spark.sql("insert into analytics_metrics_workflow_es select workflow_id,workflow_name from analytics_metrics_workflow");
[Stage 0:> (0 + 2) / 2]18/09/10 07:16:46 ERROR Utils: Aborting task
java.lang.RuntimeException: cannot find field _col0 from [0:workflow_id, 1:workflow_name]
at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef(ObjectInspectorUtils.java:416)
at org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector.getStructFieldRef(StandardStructObjectInspector.java:147)
at org.elasticsearch.hadoop.hive.HiveFieldExtractor.extractField(HiveFieldExtractor.java:52)
at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:36)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:103)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.hive.EsSerDe.serialize(EsSerDe.java:163)
at org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:153)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Any help would be highly appreciable.