Writing to ES using spark sql 2.0+


(ck) #1

Hi there ,
I am using elasticsearch-hadoop-5.3.2 jar and have created a table using HIVE cli that points to the underlying data in ElasticSearch. I can insert data into the hive table using INSERT commands and can retrieve it as well using Hive cli.
When i try to use spark-sql using the same above jar, I am able to perform SELECT but not INSERT. I have tried many INSERT statements with no success. Here is the stack trace I get. Please lmk if there are any pointers. I am on spark 2.0.1

17/05/11 14:40:08 ERROR SparkSQLDriver: Failed in [INSERT INTO TABLE rfm_es3
SELECT
user_id,
action_currency
FROM user_rfm_snapshot LIMIT 2]
java.lang.RuntimeException: java.lang.RuntimeException: class org.elasticsearch.hadoop.mr.EsOutputFormat$EsOutputCommitter not org.apache.hadoop.mapred.OutputCommitter
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:725)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.committer$lzycompute(hiveWriterContainers.scala:79)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.committer(hiveWriterContainers.scala:79)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.driverSideSetup(hiveWriterContainers.scala:88)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:130)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:234)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:310)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.Dataset.(Dataset.scala:186)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: class org.elasticsearch.hadoop.mr.EsOutputFormat$EsOutputCommitter not org.apache.hadoop.mapred.OutputCommitter
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
... 35 more
java.lang.RuntimeException: java.lang.RuntimeException: class org.elasticsearch.hadoop.mr.EsOutputFormat$EsOutputCommitter not org.apache.hadoop.mapred.OutputCommitter
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:725)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.committer$lzycompute(hiveWriterContainers.scala:79)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.committer(hiveWriterContainers.scala:79)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.driverSideSetup(hiveWriterContainers.scala:88)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:130)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:234)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:310)
...
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: class org.elasticsearch.hadoop.mr.EsOutputFormat$EsOutputCommitter not org.apache.hadoop.mapred.OutputCommitter
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
... 35 more


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.