Error while trying to insert data to elasticsearch using hive elasticsearch storage handler from spark

I have an hive interface to elasticsearch -

CREATE EXTERNAL TABLE IF NOT EXISTS analytics_metrics_workflow_es(
    workflow_id string,
    workflow_name string)
    ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe'
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
    TBLPROPERTIES('es.nodes' = 'xxxx','es.port' = '9200','es.resource' = 'analytics_metrics_workflow/data', 'es.index.auto.create' = 'true', 'es.mapping.id' = 'workflow_id');

I have another hive table as source -

CREATE EXTERNAL TABLE IF NOT EXISTS analytics_metrics_workflow(
    workflow_id string,
    workflow_name string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION '/user/sd/analytics_metrics_workflow_sd';

When I insert data from hive shell -
insert into analytics_metrics_workflow_es select workflow_id,workflow_name from analytics_metrics_workflow. This works.

But when I use spark.sql("insert into analytics_metrics_workflow_es select workflow_id,workflow_name from analytics_metrics_workflow"), it throws an error -

scala> spark.sql("insert into analytics_metrics_workflow_es select workflow_id,workflow_name from analytics_metrics_workflow");
[Stage 0:>                                                          (0 + 2) / 2]18/09/10 07:16:46 ERROR Utils: Aborting task
java.lang.RuntimeException: cannot find field _col0 from [0:workflow_id, 1:workflow_name]
	at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef(ObjectInspectorUtils.java:416)
	at org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector.getStructFieldRef(StandardStructObjectInspector.java:147)
	at org.elasticsearch.hadoop.hive.HiveFieldExtractor.extractField(HiveFieldExtractor.java:52)
	at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:36)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:103)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
	at org.elasticsearch.hadoop.hive.EsSerDe.serialize(EsSerDe.java:163)
	at org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:153)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Any help would be highly appreciable.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.