AWS, es-hadoop and 429

Hi forum,

I'm on AWS and trying to write ~ 1.2mio documents from an AWS Glue job Python / pyspark job to a managed ElasticSearch.

To be fair mentioning that I'm using AWS OpenSearch 1.2 but my question is more or less about Elastic-Hadoop integration at all.

The issue I'm facing is that after a while I'm facing "429 Too Many Requests":

org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [PUT] on [MY/doc/_bulk] failed; server[https://SOME_ENDPOINT_HERE] returned [429|Too Many Requests:]

From what I understand and read so far this is pretty much about configuration, throttling down indexing requests on the client side giving the server more time to process queued requests. And that's what I tried but somehow the config on the Hadoop connector does not work for me.

So I tried to send smaller batches of documents to the ElasticSearch and increased retry wait time: set 'es.batch.size.entries' to 100 and 'es.batch.write.retry.wait' to 30s:

df \
    .write \
    .mode('overwrite') \
    .format('org.elasticsearch.spark.sql') \
    .option('es.nodes', 'SOME_ENDPOINT_HERE') \
    .option('es.port', 443) \
    .option('es.net.ssl', 'true') \
    .option('es.net.http.auth.user', 'SOME_USER_NAME_HERE') \
    .option('es.net.http.auth.pass', 'SOME_PASS_HERE') \
    .option('es.nodes.wan.only', 'true') \
    .option('es.nodes.discovery', 'false') \
    .option('es.resource', 'SOME_NAME_HERE') \
    .option('es.index.auto.create', 'true') \
    .option('es.mapping.id', 'SOME_FIELD_HERE') \
    .option('es.write.operation', 'index') \
    .option('es.batch.size.entries', '100') \
    .option('es.batch.write.retry.policy', 'simple') \
    .option('es.batch.write.retry.count', '-1') \
    .option('es.batch.write.retry.limit', '-1') \
    .option('es.batch.write.retry.wait', '30s') \
    .save()

Already set logging for 'org.elasticsearch.hadoop.rest' logger to DEBUG level:

Bulk Flush #[12653715211658247214404]: Sending batch of [34000] bytes/[1000] entries
Bulk Flush #[12653715211658247214404]: Response received
Bulk Flush #[12653715211658247214404]: Completed. [1000] Original Entries. [1] Attempts. [1000/1000] Docs Sent. [0/1000] Docs Skipped. [0/1000] Docs Aborted.

From what I understand the Hadoop-Connector is sending batches of 1000 documents, not the 100 from my config. Further I can not see any wait time.

My actual setup on AWS is:

Spark: 2.4.3
Python: 3.7
Elasticsearch: 7.10 / OpenSearch 1.2
Elasticsearch Hadoop: 7.13.4

Any hints or ideas on my setup?

Many Thanks,
Matthias

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

This looks like it might be a bug. I tried it in both scala and python (in a docker container running spark in yarn), and saw the same thing you did. Interestingly (and frustratingly), it works fine if I add this as an integration test in the code base. For reference, here's the python code I tried that failed:

data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = sc.parallelize(data)
df = rdd.toDF()
sc.setLogLevel('ALL')
df.write.mode('overwrite').format('org.elasticsearch.spark.sql').option('es.batch.size.entries', 1).option('es.resource', 'myindex').save()
sc.setLogLevel('INFO')

And here's the analogous scala code that failed:

val data = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))
val rdd = sc.parallelize(data)
val df = rdd.toDF()
sc.setLogLevel("ALL")
df.write.mode("overwrite").format("org.elasticsearch.spark.sql").option("es.batch.size.entries", 1).option("es.resource", "myindex").save()
sc.setLogLevel("INFO")

In both cases, if I look for "Sending batch" in the logs, I see that it sent all 3 instead of 1.

Oh, as a coworker just pointed out, the problem is here: elasticsearch-hadoop/DefaultSource.scala at main · elastic/elasticsearch-hadoop · GitHub. In overwrite mode, first the data is deleted using a hard-coded es.batch.size.entries of 1000. That's what both of us were seeing in the driver log (I'm assuming). The actual writes are happening in the tasks, using the es.batch.size.entries that you specified. Unfortunately es.batch.size.entries is not configurable for that initial delete. You can either delete the documents ahead of time yourself, or avoid using overwrite. We probably ought to make the delete respect es.batch.size.entries if you set it, but that's not how it works right now.

Please also note however that Elasticsearch 7.10 is EOL and no longer supported.

Hi, thanks for your quick reply @Keith and @Mark.

Nice catch with "if (overwrite)" and the hard-coded batch size.

In case this is of interest to someone else:

Cleaned up the index manually beforehand and also adjusted the AWS instance sizing to "medium" (suggested by AWS support folks).

The indexing of the 1.2mio documents is now successfully completed in ~7min, even with es.batch.size.entries=1000:

df \
     .write\
     .mode('overwrite') \
...
     .option('es.batch.size.entries', '1000') \
     .option('es.batch.size.bytes', '250000') \
     .option('es.batch.write.retry.policy', 'simple') \
     .option('es.batch.write.retry.count', '-1') \
     .option('es.batch.write.retry.limit', '-1') \
     .option('es.batch.write.retry.wait', '30s') \
     .save()

Again thanks for the support.

Best,
Matthias

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.