Unable to parse response body for Response | ElasticsearchSinkConnector

Hello, I'm having a problem in Elasticsearch version 8.3.2, I'm using Kafka Connect with Elasticsearch's sink connector in version 13.1.0, the following error occurs:

Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://host.docker.internal:9200, response =HTTP/1.1 200 OK}' after 6 attempt(s)

I'm running Kafka and Elasticsearch environments locally using Docker compose, I used the same settings in previous Elasticsearch versions and it worked normally, after updating to version 8.3.2 it generated these problems.

Here is my docker compose configuration for Elasticsearch:

version: "3"

services:
  es01:
    container_name: es01
    image: docker.elastic.co/elasticsearch/elasticsearch:8.3.2
    environment:
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - ./data/es01:/usr/share/elasticsearch/data
      - ./data/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
      - ./data/elastic-certificates.p12:/usr/share/elasticsearch/config/elastic-certificates.p12
    ports:
      - 9200:9200
    extra_hosts:
      - "host.docker.internal:172.17.0.1"

  kib01:
    container_name: kib01
    image: docker.elastic.co/kibana/kibana:8.3.2
    depends_on:
      - es01
    ports:
      - 5601:5601
    volumes:
      - ./data/kibana.yml:/usr/share/kibana/config/kibana.yml
    extra_hosts:
      - "host.docker.internal:172.17.0.1"

  apm-server:
    container_name: apm-server
    image: docker.elastic.co/apm/apm-server:8.3.2
    depends_on:
      - es01
      - kib01
    cap_add: [ "CHOWN", "DAC_OVERRIDE", "SETGID", "SETUID" ]
    cap_drop: [ "ALL" ]
    ports:
      - 8200:8200
    volumes:
      - ./data/apm-server:/usr/share/apm-server/data
      - ./data/apm-server.yml:/usr/share/apm-server/apm-server.yml
    healthcheck:
      interval: 10s
      retries: 12
      test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:8200/
    extra_hosts:
      - "host.docker.internal:172.17.0.1"

Here is the configuration of my Elasticsearch sink connector:

name=elasticsearch-sink-from-kafka
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
topics=events.new-position,events.new-sale,events.new-checkout
connection.url=http://host.docker.internal:9200
connection.username=elastic
connection.password=elastic
type.name=_doc
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true
key.ignore=true
drop.invalid.message=true
errors.tolerance=all
errors.deadletterqueue.topic.name=events.received-errors
errors.log.include.messages=true
errors.deadletterqueue.context.headers.enable=true
transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=timestamp

Here is the full trace:

{
	"name": "elasticsearch-sink-from-kafka",
	"connector": {
		"state": "RUNNING",
		"worker_id": "kafka-connect:8083"
	},
	"tasks": [
		{
			"id": 0,
			"state": "FAILED",
			"worker_id": "kafka-connect:8083",
			"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:398)\n\tat org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)\n\tat org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)\n\tat org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)\n\tat org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)\n\tat org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:175)\n\t... 5 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://host.docker.internal:9200, response=HTTP/1.1 200 OK}' after 6 attempt(s)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:169)\n\t... 5 more\nCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://host.docker.internal:9200, response=HTTP/1.1 200 OK}\n\tat org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2190)\n\tat org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)\n\tat org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)\n\tat org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)\n\t... 8 more\nCaused by: java.lang.NullPointerException\n\tat java.base/java.util.Objects.requireNonNull(Objects.java:221)\n\tat org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:116)\n\tat org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:43)\n\tat org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:28)\n\tat org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:96)\n\tat org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:93)\n\tat org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:148)\n\tat org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:184)\n\tat org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)\n\tat org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)\n\tat org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2188)\n\t... 13 more\n"
		}
	],
	"type": "sink"
}

Can someone help me? Thank you very much in advance.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.