Hello, I'm having a problem in Elasticsearch version 8.3.2, I'm using Kafka Connect with Elasticsearch's sink connector in version 13.1.0, the following error occurs:
Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://host.docker.internal:9200, response =HTTP/1.1 200 OK}' after 6 attempt(s)
I'm running Kafka and Elasticsearch environments locally using Docker compose, I used the same settings in previous Elasticsearch versions and it worked normally, after updating to version 8.3.2 it generated these problems.
Here is my docker compose configuration for Elasticsearch:
version: "3"
services:
es01:
container_name: es01
image: docker.elastic.co/elasticsearch/elasticsearch:8.3.2
environment:
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./data/es01:/usr/share/elasticsearch/data
- ./data/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- ./data/elastic-certificates.p12:/usr/share/elasticsearch/config/elastic-certificates.p12
ports:
- 9200:9200
extra_hosts:
- "host.docker.internal:172.17.0.1"
kib01:
container_name: kib01
image: docker.elastic.co/kibana/kibana:8.3.2
depends_on:
- es01
ports:
- 5601:5601
volumes:
- ./data/kibana.yml:/usr/share/kibana/config/kibana.yml
extra_hosts:
- "host.docker.internal:172.17.0.1"
apm-server:
container_name: apm-server
image: docker.elastic.co/apm/apm-server:8.3.2
depends_on:
- es01
- kib01
cap_add: [ "CHOWN", "DAC_OVERRIDE", "SETGID", "SETUID" ]
cap_drop: [ "ALL" ]
ports:
- 8200:8200
volumes:
- ./data/apm-server:/usr/share/apm-server/data
- ./data/apm-server.yml:/usr/share/apm-server/apm-server.yml
healthcheck:
interval: 10s
retries: 12
test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:8200/
extra_hosts:
- "host.docker.internal:172.17.0.1"
Here is the configuration of my Elasticsearch sink connector:
name=elasticsearch-sink-from-kafka
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
topics=events.new-position,events.new-sale,events.new-checkout
connection.url=http://host.docker.internal:9200
connection.username=elastic
connection.password=elastic
type.name=_doc
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true
key.ignore=true
drop.invalid.message=true
errors.tolerance=all
errors.deadletterqueue.topic.name=events.received-errors
errors.log.include.messages=true
errors.deadletterqueue.context.headers.enable=true
transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=timestamp
Here is the full trace:
{
"name": "elasticsearch-sink-from-kafka",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:398)\n\tat org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)\n\tat org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)\n\tat org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)\n\tat org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)\n\tat org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:175)\n\t... 5 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://host.docker.internal:9200, response=HTTP/1.1 200 OK}' after 6 attempt(s)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:169)\n\t... 5 more\nCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://host.docker.internal:9200, response=HTTP/1.1 200 OK}\n\tat org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2190)\n\tat org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)\n\tat org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)\n\tat org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)\n\t... 8 more\nCaused by: java.lang.NullPointerException\n\tat java.base/java.util.Objects.requireNonNull(Objects.java:221)\n\tat org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:116)\n\tat org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:43)\n\tat org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:28)\n\tat org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:96)\n\tat org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:93)\n\tat org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:148)\n\tat org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:184)\n\tat org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)\n\tat org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)\n\tat org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2188)\n\t... 13 more\n"
}
],
"type": "sink"
}
Can someone help me? Thank you very much in advance.