Writing Spark Dataframe into ElasticSeach- Runs Successfully but Not all Data dumped

I am writing data from my spark-dataframe into ES. i did print the schema and the total count of records and it seems all ok until the dump gets started. Job runs successfully and no issue /error raised in spark job but the index doesn't have the supposed amount of data it should have.
i have 1800k records needs to dump and sometimes it dumps only 500k , sometimes 800k etc.
Here is main section of code.

spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .config('spark.yarn.executor.memoryOverhead', '4096') \
        .enableHiveSupport() \
        .getOrCreate()

    final_df = spark.read.load("/trans/MergedFinal_stage_p1", multiline="false", format="json")

print(final_df.count()) # It is perfectly ok
final_df.printSchema() # Schema is also ok

## Issue when data gets write in DB ##
final_df.write.mode("ignore").format(
       'org.elasticsearch.spark.sql'
    ).option(
       'es.nodes', ES_Nodes
    ).option(
       'es.port', ES_PORT
    ).option(
       'es.resource', ES_RESOURCE,
    ).save()

My resources are also ok.

Command to run spark job.

time spark-submit --class org.apache.spark.examples.SparkPi --jars elasticsearch-spark-30_2.12-7.14.1.jar --master yarn --deploy-mode cluster --driver-memory 6g --executor-memory 3g --num-executors 16 --executor-cores 2 main_es.py

I'm wondering if the problem is .mode("ignore"). The implementation of that in es-spark might be a little unintuitive. If the mode is "ignore" and es-spark finds that either the index exists or there is data in the index, it will not write any data. I haven't tried this on large amounts of data yet, but I'm wondering if that check gets called more than once (maybe an executor restarts?) and on subsequent calls does nothing because the index exists. Here is the relevant code: elasticsearch-hadoop/DefaultSource.scala at v7.14.1 · elastic/elasticsearch-hadoop · GitHub. You could try using mode "append" instead.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.