MySQL Connector to Elasticsearch Cloud Failing with ElasticsearchOverloadedError and Bulk Request Issues

I'm new to elasticsearch

I’m encountering issues while syncing data from MySQL to Elasticsearch Cloud (free 15-day trial tier) using the MySQL connector. The connector is able to connect to MySQL and Elasticsearch, but the sync job keeps failing with the following error messages:

Error Logs from the Connector New Logs:

[FMWK][15:18:09][INFO] [Connector id: ADMp35MBB5sivtOKoyGn, index name: connector-mysql-74b1] Successfully connected to the MySQL Server.
[FMWK][15:18:53][WARNING] Client method 'bulk' retry 1: connection timeout
[FMWK][15:20:50][ERROR] [Connector id: ADMp35MBB5sivtOKoyGn, index name: connector-mysql-74b1, Sync job id: Rv9435MB6DbbSf4pe9a3] Extractor failed with an error: MemQueue has been full for 600.5129s. while timeout is 600s.
[FMWK][15:20:50][ERROR] [Connector id: ADMp35MBB5sivtOKoyGn, index name: connector-mysql-74b1, Sync job id: Rv9435MB6DbbSf4pe9a3] Sync was throttled by Elasticsearch
[FMWK][15:20:51][INFO] [Connector id: ADMp35MBB5sivtOKoyGn, index name: connector-mysql-74b1, Sync job id: Rv9435MB6DbbSf4pe9a3] Canceling the Sink task: Sink for JobType.FULL sync to connector-mysql-74b1

also I see this in the logs:

[FMWK][15:27:27][ERROR] Exception found for task Elasticsearch Sink: _bulk batch #2500
Traceback (most recent call last):
  File "/app/connectors/logger.py", line 264, in _awrapped
    return await func(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/sink.py", line 191, in _batch_bulk
    res = await self.client.bulk_insert(operations, self.pipeline["name"])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/management_client.py", line 277, in bulk_insert
    return await self._retrier.execute_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/client.py", line 258, in execute_with_retry
    raise RetryInterruptedError(msg)
connectors.es.client.RetryInterruptedError: Retry operation was interrupted
[FMWK][15:27:27][ERROR] Exception found for task Elasticsearch Sink: _bulk batch #1500
Traceback (most recent call last):
  File "/app/connectors/logger.py", line 264, in _awrapped
    return await func(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/sink.py", line 191, in _batch_bulk
    res = await self.client.bulk_insert(operations, self.pipeline["name"])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/management_client.py", line 277, in bulk_insert
    return await self._retrier.execute_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/client.py", line 258, in execute_with_retry
    raise RetryInterruptedError(msg)
connectors.es.client.RetryInterruptedError: Retry operation was interrupted
[FMWK][15:27:27][ERROR] Exception found for task Elasticsearch Sink: _bulk batch #2000
Traceback (most recent call last):
  File "/app/connectors/logger.py", line 264, in _awrapped
    return await func(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/sink.py", line 191, in _batch_bulk
    res = await self.client.bulk_insert(operations, self.pipeline["name"])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/management_client.py", line 277, in bulk_insert
    return await self._retrier.execute_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/client.py", line 258, in execute_with_retry
    raise RetryInterruptedError(msg)
connectors.es.client.RetryInterruptedError: Retry operation was interrupted
[FMWK][15:27:27][ERROR] Exception found for task Elasticsearch Sink: _bulk batch #1000
Traceback (most recent call last):
  File "/app/connectors/logger.py", line 264, in _awrapped
    return await func(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/sink.py", line 191, in _batch_bulk
    res = await self.client.bulk_insert(operations, self.pipeline["name"])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/management_client.py", line 277, in bulk_insert
    return await self._retrier.execute_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/client.py", line 258, in execute_with_retry
    raise RetryInterruptedError(msg)
connectors.es.client.RetryInterruptedError: Retry operation was interrupted
[FMWK][15:27:27][ERROR] Exception found for task Elasticsearch Sink: _bulk batch #500
Traceback (most recent call last):
  File "/app/connectors/logger.py", line 264, in _awrapped
    return await func(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/sink.py", line 191, in _batch_bulk
    res = await self.client.bulk_insert(operations, self.pipeline["name"])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/management_client.py", line 277, in bulk_insert
    return await self._retrier.execute_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/connectors/es/client.py", line 258, in execute_with_retry
    raise RetryInterruptedError(msg)
connectors.es.client.RetryInterruptedError: Retry operation was interrupted

Elasicsearch UI I got this:

Sync failure: ElasticsearchOverloadedError: Connector was unable to ingest data into overloaded Elasticsearch. Make sure Elasticsearch instance is healthy, has enough resources and content index is healthy..

Hi @Zikou,

The error message can be interpreted as "connector was unable to send any data to Elasticsearch within 600 seconds". It can happen because Elasticsearch is overloaded, there are problems with shard with your data index and such.

Is your Elasticsearch instance healthy?

Also setting service.log_level: DEBUG in your config file can help with troubleshooting - logs will contain more info about Elasticsearch calls.