hello,
I am trying to query with pyspark an index with documents:
{
"field1": "field1_data",
"field2": [
{
"field2_1": "x1",
"field2_2": "x2"
},
{
"field2_1": "x3",
"field2_2": "x4"
}
]
}
if i query from kibana:
post my_index/_search
{
"query": {
"bool": {
"must": [],
"filter": [
"terms" {
"field2.field2_1": "x1"
}
]
}
},
"_source" = ["field1", "field2.field2_1"]
}
it retruns everyting fine:
hits =
{
"field1": "field1_data",
"field2": [
{
"field2_1": "x1"
}
]
}
now i try to use pyspark:
the data frame schema is:
index1_schema = StructType([
StructField("field1", StringType(), nullable=True),
StructField("field2", ArrayType(StructType[
StructField("field2_1", StringType(), nullable=True),
StructField("field2_2", StringType(), nullable=True)
),
containsNull=False), nullable=True)
])
i create spark options for elastic:
options ={
"es.nodes": ....,
"es.resource": "my_index",
"es.query": '''
{
"query": {
"bool": {
"must": [],
"filter": [
"terms" {
"field2.field2_1": "x1"
}
]
}
},
"_source":["field1", "field2.field2_1"]
}
'''
]
i create a data frame reader
reader = sparkSession.read.schema(index1_schema).format('org.elasticsearch.spark.sql').options(options)
than i do
df = reader.load().select(['field1', 'field2']
the df looks like:
field1 | field2
=======================
"field1_data" | [{Null, Null}]
where field2 contains an array of pyspark rows with one row object :
{
"field2_1":None
}
what am i doing wrong?
anyone? please help