Error with mapping ArrayType in pySpark "not found; typically this occurs with arrays which are not mapped as single value"


(Vianney Bailleux) #1

in database's elasticSearch I have this data:

{
    "titre": "Formation ElasticSearch",
    "sous-titre": "Mon sous titre",
    "formateurs": [
        {
            "prenom": "Martin",
            "nom": "Legros"
        }
    ],
    "jours": 3,
    "url": "http://test.fr"
}

formateurs is a array of person. here we have one person.

and I do this mapping on pySpark:

person= StructType([
    StructField("nom", StringType()),
    StructField("prenom", StringType()),
])

schema= StructType([
    StructField("titre", StringType()),
    StructField("sous-titre", StringType()),
    StructField("jours", LongType()),
    StructField("url", StringType()),
    StructField("formateurs", ArrayType(person)),
])

parcel= sqlContext.read.format("org.elasticsearch.spark.sql").schema(schema).load("zenika")
parcel.printSchema()
parcel.show(1)

I get this schema:

|-- titre: string (nullable = true)
|-- sous-titre: string (nullable = true)
|-- jours: long (nullable = true)
|-- url: string (nullable = true)
|-- formateurs: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- nom: string (nullable = true)
|    |    |-- prenom: string (nullable = true)

in this example there are no errors

but if i add one formateurs, i have one errors. example:

{
    "titre": "Formation ElasticSearch",
    "sous-titre": "Mon sous titre",
    "formateurs": [
        {
            "prenom": "Martin",
            "nom": "Legros"
        },
        {
            "prenom": "Marc",
            "nom": "Duchien"
        }
    ],
    "jours": 3,
    "url": "http://test.fr"
}

and I get this error:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'formateurs.nom' not found; typically this occurs with arrays which are not mapped as single value
    at org.elasticsearch.spark.sql.RowValueReader$class.rowColumns(RowValueReader.scala:51)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.rowColumns(ScalaEsRowValueReader.scala:32)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:69)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:968)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readListItem(ScrollReader.java:875)
    at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:927)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:833)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:1004)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:846)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:602)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:426)
    ... 27 more

would you be able to explain me how to make ArrayType because I did not find a tutorial with complex schema.

thank you so much.


(Vianney Bailleux) #2

just configure SparkContext with conf:

conf= SparkConf() \
    .set("es.read.field.as.array.include", "formateurs") \
    .set("es.nodes", "localhost") \
    .set( "es.port", "9200") \
    .set( "es.input.json", "yes")
sqlContext= SQLContext(sc)