We've kafka elastic search sink connector transporting data to Elastic search(v 5.6.3 ). The connector configuration is as follows :
{ "name":"elasticsearch_topic", "config": { "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"3", "topics":"sample_topic", "connection.url":"https://127.0.0.1:9200,https://127.0.0.2:9200", "connection.username":"elsatic_user", "connection.password":"elastic_user", "type.name":"log", "flush.timeout.ms":"60000", "connection.timeout.ms":"60000", "read.timeout.ms":"60000", "batch.size":"20", "topic.index.map":"sample_topic:elastic_search_index_test", "transforms":"extract,insertenv,inserttimestamp,convert_current_ts,routeTS", "schema.ignore": "true", "transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extract.field":"RE_NUM", "transforms.insertenv.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertenv.static.field": "_env", "transforms.insertenv.static.value": "dev", "transforms.inserttimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.inserttimestamp.timestamp.field": "date_time", "transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.convert_current_ts.target.type": "Timestamp", "transforms.convert_current_ts.field": "date_time", "transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS", "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter", "transforms.routeTS.topic.format":"elastic_search_index_test-${timestamp}", "transforms.routeTS.timestamp.format":"yyyyMMdd" } }
So far so good. No issues.
Recently we've enabled SSL on elastic search and for this I've added "username" and "password" and "https" to the above configurations.Then restarted the connector and worker. Since then I can see " index_already_exists_exception " with error as below :
[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Could not create index 'elastic_search_index_test': {"root_cause": [{"type":"index_already_exists_exception","reason":"index [elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}],"type":"index_already_exists_exception","reason":"index [elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"} at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:238) at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
Steps that I've tried so far :
Stopped the elastic search sink connector and worker
Deleted the index "elastic_search_index_test" from elastic search (through Kibana)
Restarted the worker and elastic search connector
But still getting the same error (as described above)
Could any one suggest what's wrong here?
Thanks in advance!!