Elasticsearch connector error

I have this error with the elasticsearch connector. Do you know why?

`name	"elasticsearch-sink"`
`connector	`
`state	"RUNNING"`
`worker_id	"xxx.xxx.xxx.xxx:8083"`
`tasks	`
`0	`
`id	0`
`state	"FAILED"`
`worker_id	"xxx.xxx.xxx.xxx:8083"`
`trace	"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: ` `javax.net` `.ssl.SSLProtocolException: Connection reset\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:284)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:290)\n\tat io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:255)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: ` `javax.net` `.ssl.SSLProtocolException: Connection reset\n\tat java.base/sun.security.ssl.Alert.createSSLException(Alert.java:126)\n\tat java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321)\n\tat java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)\n\tat java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:259)\n\tat java.base/sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1314)\n\tat java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:839)\n\tat ` `org.apache.http.impl.io` `.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)\n\tat ` `org.apache.http.impl.io` `.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)\n\tat ` `org.apache.http.impl.io` `.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)\n\tat org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)\n\tat org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)\n\tat ` `org.apache.http.impl.io` `.AbstractMessageParser.parse(AbstractMessageParser.java:259)\n\tat org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)\n\tat org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)\n\tat org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)\n\tat org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)\n\tat org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)\n\tat org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)\n\tat org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)\n\tat org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)\n\tat org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)\n\tat org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)\n\tat io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:133)\n\tat io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70)\n\tat io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:281)\n\t... 14 more\nCaused by: ` `java.net` `.SocketException: Connection reset\n\tat java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)\n\tat java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)\n\tat java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:448)\n\tat java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68)\n\tat java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1104)\n\tat java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:823)\n\t... 34 more\n"`

This is the configuration:

`name    "elasticsearch-sink"`
`config    `
`connector.class    "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"`
`type.name    "kafka-connect"`
`connection.password    ""`
`tasks.max    "1"`
`connection.timeout.ms    "30000"`
`key.ignore    "true"`
`internal.key.converter.schemas.enable    "false"`
`elastic.https.ssl.truststore.password    "password"`
`value.converter    "io.confluent.connect.avro.AvroConverter"`
`read.timeout.ms    "30000"`
`key.converter    "io.confluent.connect.avro.AvroConverter"`
`elastic.https.ssl.truststore.type    "JKS"`
`topics    "uuu.test.kkk,uuu.test.vvv,uuu.test.ppp,uuu.test.ooo,uuu.test.vvv,uuu.test.fff,uuu.test.qqq"`
`connection.username    "elastic"`
`value.converter.schema.registry.url    "` `http://xxx.xxx.xxx.xxx:8081` `,` `http://xxx.xxx.xxx.xxx:8081` `,` `http://xxx.xxx.xxx.xxx:8081` `"`
`internal.key.converter    "org.apache.kafka.connect.json.JsonConverter"`
`flush.timeout.ms    "30000"`
`elastic.https.ssl.protocol    "TLS"`
`topic.index.map    "uuu.test.kkk:kkkvalue,uuu.test.vvv:vvvvalue,uuu.test.ppp:pppvalue,uuu.test.ooo:oldooovalue,uuu.test.vvv:vvvvalue,uuu.test.fff:fffvalue,uuu.test.qqq:qqqvalue"`
`internal.value.converter.schemas.enable    "false"`
`name    "elasticsearch-sink"`
`internal.value.converter    "org.apache.kafka.connect.json.JsonConverter"`
`connection.url    "` `https://xx.xxx.com:9200` `"`
`key.converter.schema.registry.url    "` `http://xxx.xxx.xxx.xxx:8081` `,` `http://xxx.xxx.xxx.xxx:8081` `,` `http://xxx.xxx.xxx.xxx:8081` `"`
`elastic.https.ssl.truststore.location    "/kafka/test-certs/test.server.truststore.jks"`
`tasks    `
`0    `
`connector    "elasticsearch-sink"`
`task    0`
`type    "sink"`

The connector works for a while and then after some time, I get that error

Hi @xemyleex

I think the problem lies in

`javax.net` `.ssl.SSLProtocolException: Connection reset\n\tat 

For some reason the connection from the Kafka connector to the ES cluster dropped.
If restarting the connector process alone (without doing anything to the ES cluster) fixes the issue, I think this issue is best explored with the maintainers of the connector. It might be an issue the exception handling/retry-implementation of that connector or so that we on the ES end can't do anything about.

To make it work again, I have to delete and register the connector again.
So what should I do?
Thanks :slight_smile:

@xemyleex

That's unfortunately a problem I don't know how to help you with. You'd have to check with the maintainers of the connector to get help here since the does not seem to be an issue with Elasticsearch itself (if you can fix it by reloading the connector then the ES cluster seems to have been fine).
Maybe it has some settings around connection retrying that you can tweak? Or maybe it's some bug with it. Either way, I'd continue my investigation at the connector side of things.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.