Spark / Elasticsearch Exception: Maybe ES was overloaded?


(Russell Jurney) #1

I am having trouble testing some of the code from my new book, Agile Data Science 2.0. I am writing from PySpark to Elasticsearch and keep running into an error.

This is local Spark on one node with local elasticsearch, as these are examples in a book. This is running on a r4.xlarge EC2 instance on Ubuntu. The Parquet data is 155MB.

The script is here: ch04/pyspark_to_elasticsearch.py

It looks like:

# Load the parquet file
on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')

on_time_dataframe.repartition(1).write.format("org.elasticsearch.spark.sql")\
  .repartition(1)\
  .option("es.resource","agile_data_science/on_time_performance")\
  .mode("overwrite")\
  .save()

Note that I added the call to repartition to try to throttle the work. After a few minutes, I get this error: https://gist.github.com/rjurney/ec0d6b1ef050e3fbead2314255f4b6fa

The take home message is:

[agile_data_science][0] primary shard is not active Timeout: [1m], request: [BulkShardRequest to [agile_data_science] containing [1000] requests]

What can I do to make this work? It looks like Elasticsearch is getting overloaded, but this is one Spark partition so I don't know how to throttle it further. Note that some records are getting written, I can search them afterwards but they don't all make it.

Would adding more shards help? I just don't know. Any suggestions would be appreciated.

Thanks!


(James Baiera) #2

@rjurney Are there any logs that appear in the Elasticsearch log that may highlight why the primary shard becomes inactive during the load?


(Russell Jurney) #3

I fixed it by changing the batch size from 1,000 to 100.

    # Load the parquet file
    on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')

    # Save the DataFrame to Elasticsearch
    on_time_dataframe.write.format("org.elasticsearch.spark.sql")\
      .option("es.resource","agile_data_science/on_time_performance")\
      .option("es.batch.size.entries","100")\
      .mode("overwrite")\
      .save()

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.