Having trouble executing isin query from Pyspark to elastic


I've trying to implement a simple scenario to get data from elastic into my spark program.

I have two indexes
First one is search index using this i am fetching messageIds on the basis of a matching value from one attribute

#Fetching messageIds from the search Index matching search term

q ="""{
"query": {
"match": {"column":"valueXXXX"}

reader = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes",connection_url).option("es.query", q).
s_df = reader.load("seachIndex1.0/search")
df_messageIds = s_df.select('message_id').distinct().rdd.map(lambda r: r[0].encode("utf-8")).take(5000)

*****This index contains only MessageIds my scenario is to use this messageIds and fetch messages from otherindex

#Fetching messages from the messageIndex
q ="""{
"query": {
"match_all": {}

reader = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes",connection_url).option("es.query", q).option("pushdown","true")
m_df = reader.load("messageIndex1.0/message")
filtered_df = m_df.filter(col('system_id').isin(['513ed057-f40e-43f8-b675-8fb3886c7640', '68d79d86-a7e2-46de-9aaf-f89341e66fe4']) == True)

Issue is that in current scenario it is taking too much time to filter out messages on the basis of Ids

I tried joining two data-frames .
I tried using is in clause
Also i am not able to send list of MessageIds with query to elasticSearch

Also I tried pushdown feature somehow it is also not working with IN clause.

Please suggest how can i optimize this scenario


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.