Elasticsearch-spark-30 read missing field(double type) error


hi!
When I use the elasticsearch-spark-30_2.12-7.16.1.jar to read data, I find that a field of double type is missing, which causes the exception of the Spark application. But I use elasticsearch-spark20_ 2.11-7.6.1. jar can successfully obtain data. Will this problem be fixed?

You'll have to provide more information. What version of spark are you using (you are using es-spark jars for two completely different versions of spark, so I'm surprised that one of them works at all)? What version of Elasticsearch are you using? Can you provide code (including mappings and data) to reproduce this? And can you paste the stack trace?

Issue description

First, I set a field value of double type to be an empty string.
Then when I use Spark to read this field, it will throw an exception(Cannot parse value [] for field [field name])
I learned that when reading empty, it can be set to null. Therefore, I set es.field.read.empty.as.null to true. Unfortunately, another exception was thrown(scala.None$ is not a valid external type for schema of double), and I was stuck in the loop
I realize that this may be a bug. Please check

Steps to reproduce

ps:index field "C":{"type":double"}

 session.read().format("org.elasticsearch.spark.sql")
            .option("es.field.read.empty.as.null","true")
            .load(indexName)

Strack trace:

es.field.read.empty.as.null = true
Strack trace:
Caused by:java.lang.RuntimeException:Error while encoding: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of double
at org.apache.spark.sql.error.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
... 19 more

es.field.read.empty.as.null = false
Strack trace:
Caused by: crest.EsHadoopParsingException: Cannot parse value for field[C]
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:903)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:1047)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:889)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:602)
... more


### Version Info

OS:         :  win7 64bit
JVM         :  jdk1.8
Hadoop/Spark:  spark-sql_2.12-3.2.1.jar
ES-Hadoop   :  elasticsearch-spark-30_2.12-7.16.1.jar(There is no such problem when I use elasticsearch-spark-20_2.11-7.6.1.jar)
ES          :  6.8.14(and I made the same mistake when using 7.17.0)

I haven't been able to reproduce it. Here's what I did in my test docker container. It is using Elasticsearch 8.1.0 and Spark 3.2.1.
First, I created the mapping and data in Elasticsearch:

curl -X PUT "localhost:9200/test?pretty" -H 'Content-Type: application/json' -d'
{
  "mappings": {
    "properties": {
      "C":{"type":"double"}
    }
  }
}
curl -X POST localhost:9200/test/_doc/ -H 'Content-Type: application/json' -d'
{
  "C": ""
}
curl -X POST localhost:9200/test/_doc/ -H 'Content-Type: application/json' -d'
{
  "C": "1.5"
}

Then ran es-spark and did this:

import org.apache.spark.sql._
val sqc = new SQLContext(sc)
sqc.read.format("es").option("es.field.read.empty.as.null","true").load("test").show
+----+                                                                          
|   C|
+----+
|null|
| 1.5|
+----+

If I add an empty string to an array of values for the field:

elastic@localhost:~$ curl -X POST localhost:9200/test/_doc/ -H 'Content-Type: application/json' -d'
{
  "C": ["2.5", ""]
}
'

I get this exception:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, C), DoubleType) AS C#5
  at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of double
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
  ... 19 more

But that looks different from what you are seeing.

etest7 C double type

C is null and version is 7.17.0

I can read successfully when c has no value

Put empty string into c

es.field.read.empty.as.null=false Strack trace:

es.field.read.empty.as.null=true Strack trace:

Thanks your prompt reply extremely

first of all .I'm sorry that I can only upload photos, which may be difficult to read. The reason is that my company requires me not to develop programs on the Internet. And I see that the version you are using is different from mine.

This is the maven dependency I used for this verification:
elasticsearch-spark-30_2.12-7.16.1
spark-sql_2.12-3.2.1

Elasticsearch version 7.17.0

I see you're using Elasticsearch 7.17 and Spark 3.2. Spark 3.2 support was not added until 8.0 -- https://github.com/elastic/elasticsearch-hadoop/pull/1807. I don't know that that is related to your current problem but you will run into problems with that combination. Spark made some breaking changes between 3.0 and 3.2.

Thanks for the reminder, I will try

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.