Best practise to read ES from PySpark

I am trying to find the best way to read data from Elastic Search ( V: 5.1.1) through Apache Spark ( V: 2.2.1). I am using driver jar version ( elasticsearch-spark-20_2.11-5.3.1.jar).

My question is mainly around reading array fields. My documents schema are uniform with in an index type. So I am trying to utilize specifying the schema while reading.

for example,

df_ES_Index= spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes","192.168.0.1:9200")
.option("schema",schema_index)
.load("index/index_type")

schema

schema_n_offset=StructType([
StructField("length",IntegerType(),True),
StructField("offset",StringType(),True)
])

schema_n_language=StructType([
StructField("field1",StringType(),True),
StructField("field2",StringType(),True),
StructField("field3",FloatType(),True),
StructField("offsets",ArrayType(schema_n_offset),True)
])

schema_index=StructField([
StructField("languages",ArrayType(schema_n_language),True)
])

Here, my first level field "languages" recognized correctly as "array" in Spark. however, field "offsets" with in "languages" field is read as "struct" type in Spark. this result in an error

Field 'languages.offsets' is backed by an array but the associated Spark Schema does not reflect this;

I know I can inlcude / exclude fields to bypass this error ("es.read.field.as.array.include").

I thought I don't need to go through that If I can specify the schema while reading data from ElasticSearch. Do anyone have suggestion to read nested / array fields data from ElasticSearch through spark?

Thanks!

2 Likes

I was just bumming around in this part of the code recently—The deserialization code that performs the conversion from JSON document to Spark Row isn't aware of schema objects at the level it's running. It does make sense that we should just sense the array field when it is provided in the job driver and automatically mark the field as an array if it isn't already. I'll open an issue for this.

I opened https://github.com/elastic/elasticsearch-hadoop/issues/1107 for automatically picking up array fields from a user supplied schema.

3 Likes

Thanks very much!

I have one follow up question re ElasticSearch to DataFrame. When I read the index_type by specifying the schema, I always get 'ScalaMatchError'.

scala.MatchError: [null,null,null,null,null,gmail.com,gmail.com,temp@gmail.com,temp@gmail.com,temp@gmail.com] (of class org.elasticsearch.spark.sql.ScalaEsRow)

But the same ES document, If I save and read it as JSON, it works fine and no error.

Also, wondering s there any way that I can enforce the schema. Looks like schema is not respected and ignored. I am following the syntax below.

ex:

df_ES_Assets_relationships=spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes","192.168.1.1:9200")
.option("schema",schema_assets_relationships)
.load("index/type")

That looks pretty strange. Could you include the mapping for your index, the piece of offending data, and the spark operations/settings you are using?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.