EsHadoopInvalidRequest: Malformed scrollId caused by es.scroll.limit

Hi,
The following simple command in Spark 2.2.0 works well:

df = spark.read.format("org.elasticsearch.spark.sql") 
.option("es.nodes", "{my es nodes}")
.option("es.port", "9200")
.option("es.net.http.auth.user", "") 
.option("es.net.http.auth.pass", "") 
.option("es.net.proxy.http.host", "")
.option("es.net.proxy.http.port", "")
.option("es.net.proxy.http.user", "")
.option("es.net.proxy.http.pass", "") 
.option("pushdown", "true") 
.load("{my index/type}")
.select("date")

If I add the option:

.option("es.scroll.limit",10)

I get the following error:

{...} Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: 
      ElasticsearchIllegalArgumentException[Malformed scrollId []]

Anyone can help?
Thank you
Patrick

Could you collect TRACE level logs for the job as well as a stack trace for your error and share them here?

May I ask you how to collect the TRACE level logs? Do I have to update the log4j.properties located in the spark/conf directory as seen here?

Stack trace (I had to cut some lines because body of my answer was too long). Note that the the first df.show() was working but that the second one caused an error:
In [7]: df.show()
+--------+
| date|
+--------+
|20150811|
|20150619|
...
|20150505|
|20140802|
+--------+
only showing top 20 rows

In [8]: df.show()
17/05/17 15:54:14 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 41, 149.56.241.70, executor 2): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: ElasticsearchIllegalArgumentException[Malformed scrollId []]

at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
at org.elasticsearch.hadoop.rest.RestClient.scroll(RestClient.java:497)

...
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
...
at java.lang.Thread.run(Thread.java:745)

17/05/17 15:54:14 ERROR TaskSetManager: Task 0 in stage 7.0 failed 4 times; aborting job

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 df.show()

/usr/local/spark/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
315 """
316 if isinstance(truncate, bool) and truncate:
--> 317 print(self._jdf.showString(n, 20))
318 else:
319 print(self._jdf.showString(n, int(truncate)))

/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
...
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o110.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 45, 149.56.241.70, executor 0): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: ElasticsearchIllegalArgumentException[Malformed scrollId []]
...

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1469)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1456)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1456)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1684)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1628)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2015)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2036)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2785)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset$$anonfun$57.apply(Dataset.scala:2769)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2768)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2325)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:251)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: ElasticsearchIllegalArgumentException[Malformed scrollId []]

at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
at org.elasticsearch.hadoop.rest.RestClient.scroll(RestClient.java:497)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:375)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:112)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

I believe that logging solution should work. Could you also include which versions of Elasticsearch and ES-Hadoop you are using?

ES 1.7.1 and elasticsearch-spark-20_2.11-5.1.2.
Sorry for the question but I don't know which line to update in the following properties of log4j file:

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive     support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

Should I update the log4j.logger.org.apache.spark.repl.Main line (I'm in the pyspark shell) and set TRACE instead of WARN?

This looks like it might be a backwards compatibility bug with Elasticsearch 1.7. Could you open an issue for this on Github?

Sure. Could you tell me where please? I don't want to open the issue at the wrong place!

Thank you!

Done. Thanks James.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.