Calling union on two dataframes from spark and elastic search stuck

i am using pyspark to query elstic search
i have the folowing code
d1 = data_frame1.collect() # return 1 row
d2 = data_frame2.collect() # return no rows

but when i call
d3 data_frame1.union(data_frame2).collect()

the code us stuck

the collection i am quering has 20 rows
the schema of data_frame1 and data_frame2 is the same

when i call collect on each it returns correct result
but when i merge them and call collect it stuck

i am using elasticsearch-spark-30_2.12-8.9.0
Elasticsearch 8.9.2

the query im am using on both data frames is:

'''
{
"bool": {
"must": [{"term": {"field1": "x1213"}}]
}
'''

please help me

It seems odd that it would get stuck on such a small amount of data. Have you checked the logs (the logs for the driver as well as the executors running tasks)?

thanks for the reply
i checked the logs of spark and i dont see any errors
its just get stuck on a single task
is there anything i should be looking for?
where do i look for errors?

Did you check the task logs on each spark executor? That's where I would guess you would see something, but it is hard to know. You could also check the elasticsearch logs, but I wouldn't think that an elasticsearch error would cause spark to hang.

ok here is what i found out:
we have 2 indexs
index1- with 20 docs
index2 - with 100000 docs

dataframe1 is a join between 2 dataframes:
dataframe3 - queries index1 (returns 1 row on dataframe3 .collect())
dataframe4 - queries index2 (returns 1 row on dataframe4 .collect())

so
dataframe1 = dataframe3 .join(dataframe4 )
when i call dataframe1.collect() it return 1 row immediatly

dataframe1 return rows from index1

dataframe2 queries index1 with different query (returns 1 row on dataframe2 .collect())

when i do

dataframe1.union(dataframe2).collect() it gets stuck....

what is vert strange is when i don use dataframe4 in the join (so dataframe1 = dataframe3)
everything works fine....
please help

Can you post your code? Or better yet, could you reproduce it in this docker image and give exact steps to reproduce it?

# index2 has columns: ["key1", "key2", "key3", "field1"]
# all of the same type 

# init data frame 1:

options1 = {
   'es.nodes': 'xxxxxxxxxx',
   'es.resource': 'index1',
   'es.query': '{
        "fields": ["key1", "key2", "key3", "key4", "key5", "key6"],
        "query": {
            "bool": {
                "must":[
                    {
                        "term": {
                            "key1": "some_key"
                        }
                    }
                ]
            }
        },
        "_source": False
        
   }'
}

schema1 = ....

reader1 = spark_sesstion
    .read.schema(schema1)
    .format('org.elasticsearch.spark.sql')
    .options(**options1)
    
df1 = reader1.load().select(["key1", "key2", "key3", "key4", "key5", "key6"])

# init data frame 2:
options2 = {
   'es.nodes': 'xxxxxxxxxx',
   'es.resource': 'index2',
   'es.query': '{
        "fields": ["key1", "key2", "key3"],
        "query": {
            "bool": {
                "must":[
                    {
                        "term": {
                            "key1": "some_key",   
                        }
                    }
                ]
            }
        },
        "_source": False
        
   }'
}

schema2 = ....

reader2 = spark_sesstion
    .read.schema(schema2)
    .format('org.elasticsearch.spark.sql')
    .options(**options2)
df2 = reader2.load().select(["key1", "key2", "key3"])


df_1_2 = df1.join(df2, ["key1", "key2", "key3"])

# init data frame 3:
options3 = {
   'es.nodes': 'xxxxxxxxxx',
   'es.resource': 'index1',
   'es.query': '{
        "fields": ["key1", "key2", "key3", "key4", "key5", "key6"],
        "query": {
            "bool": {
                "must":[
                    {
                        "term": {
                            "some_field: "some field"
                        }
                    }
                ]
            }
        },
        "_source": False
        
   }'
}

reader3 = spark_sesstion
    .read.schema(schema1)
    .format('org.elasticsearch.spark.sql')
    .options(**options3)
    
df3 = reader3.load().select(["key1", "key2", "key3", "key4", "key5", "key6"])

res = df_1_2.union(df3)
d = res.collect() 

please please help

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.