How to connect Elasticsearch correctly in a Kafka connector?

Step 1 - Deploy Elasticsearch in Kubernetes (succeed)

I deployed Elasticsearch in Kubernetes by following the tutorial

apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: hm-elasticsearch
  namespace: elastic
spec:
  version: 7.14.0
  nodeSets:
    - name: default
      count: 1
      config:
        node.store.allow_mmap: false

Based on the tutorial, I can succeed calling by providing username elastic and password passw0rd in the header by

curl -u "elastic:passw0rd" -k " https://hm-elasticsearch-es-http.elastic:9200"

which returns

{
    "name": "hm-elasticsearch-es-default-0",
    "cluster_name": "hm-elasticsearch",
    "cluster_uuid": "TWgIk0YGR_GVr7IJZcW62g",
    "version": {
        "number": "7.14.0",
        "build_flavor": "default",
        "build_type": "docker",
        "build_hash": "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
        "build_date": "2021-07-29T20:49:32.864135063Z",
        "build_snapshot": false,
        "lucene_version": "8.9.0",
        "minimum_wire_compatibility_version": "6.8.0",
        "minimum_index_compatibility_version": "6.0.0-beta1"
    },
    "tagline": "You Know, for Search"
}

Step 2 - Add ElasticsearchSinkConnector (failed)

Now I am trying to adding ElasticsearchSinkConnector, however, I am having issue to set it up.

Based on this ElasticsearchSinkConnector config, I wrote my config, and then try to verify by

curl --location --request PUT 'http://hm-connect-cluster-connect-api.kafka:8083/connector-plugins/io.confluent.connect.elasticsearch.ElasticsearchSinkConnector/config/validate' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "elasticsearch-sink",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "roles",
    "connection.url": "https://hm-elasticsearch-es-http.elastic:9200",
    "connection.username": "elastic",
    "connection.password": "tD23h49YN34ouji74E4kt2ow",
    "transforms": "unwrap,key",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id",
    "key.ignore": "false",
    "type.name": "role",
    "behavior.on.null.values": "delete"
}'

It returns the error

{
    "name": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "error_count": 3,
    "groups": [
        "Common",
        "Transforms",
        "Predicates",
        "Error Handling",
        "Transforms: unwrap",
        "Transforms: key",
        "Connector",
        "Data Conversion",
        "Proxy",
        "Security",
        "Kerberos",
        "Data Stream"
    ],
    "configs": [
        // ...
        {
            "definition": {
                "name": "connection.url",
                "type": "LIST",
                "required": true,
                "default_value": null,
                "importance": "HIGH",
                "documentation": "The comma-separated list of one or more Elasticsearch URLs, such as ``http://eshost1:9200,http://eshost2:9200`` or ``https://eshost3:9200``. HTTPS is used for all connections if any of the URLs starts with ``https:``. A URL without a protocol is treated as ``http``.",
                "group": "Connector",
                "width": "LONG",
                "display_name": "Connection URLs",
                "dependents": [],
                "order": 1
            },
            "value": {
                "name": "connection.url",
                "value": "https://hm-elasticsearch-es-http.elastic:9200",
                "recommended_values": [],
                "errors": [
                    "Could not connect to Elasticsearch. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
                ],
                "visible": true
            }
        },
        {
            "definition": {
                "name": "connection.username",
                "type": "STRING",
                "required": false,
                "default_value": null,
                "importance": "MEDIUM",
                "documentation": "The username used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if  both the username and password are non-null.",
                "group": "Connector",
                "width": "SHORT",
                "display_name": "Connection Username",
                "dependents": [],
                "order": 2
            },
            "value": {
                "name": "connection.username",
                "value": "elastic",
                "recommended_values": [],
                "errors": [
                    "Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
                ],
                "visible": true
            }
        },
        {
            "definition": {
                "name": "connection.password",
                "type": "PASSWORD",
                "required": false,
                "default_value": null,
                "importance": "MEDIUM",
                "documentation": "The password used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if  both the username and password are non-null.",
                "group": "Connector",
                "width": "SHORT",
                "display_name": "Connection Password",
                "dependents": [],
                "order": 3
            },
            "value": {
                "name": "connection.password",
                "value": "[hidden]",
                "recommended_values": [],
                "errors": [
                    "Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
                ],
                "visible": true
            }
        },
        // ...
    ]
}

The full verification response with all fields can be found at hm-connect-cluster-connect-api verify error · GitHub

Then I found this document Elasticsearch Connector with Security | Confluent Documentation which has more config regarding SSL

"elastic.security.protocol": "SSL"
"elastic.https.ssl.keystore.location": "/home/directory/elasticsearch-6.6.0/config/certs/keystore.jks"
"elastic.https.ssl.keystore.password": "asdfasdf"
"elastic.https.ssl.key.password": "asdfasdf"
"elastic.https.ssl.keystore.type": "JKS"
"elastic.https.ssl.truststore.location": "/home/directory/elasticsearch-6.6.0/config/certs/truststore.jks"
"elastic.https.ssl.truststore.password": "asdfasdf"
"elastic.https.ssl.truststore.type": "JKS"
"elastic.https.ssl.protocol": "TLS"

In my case, I only need provider username and password to succeed. But in above config, there is no place to provide username. So I am not sure how to really write it correctly.

Based on the deployment method in step 1, is it using keystore or truststore?

I cannot edit original question any more. I posted the same question with more info at Stack Overflow kubernetes - How to connect Elasticsearch deployed by Elastic Operator correctly in a Kafka connector? - Stack Overflow

I finally made it! Posted the solution at kubernetes - How to connect Elasticsearch deployed by Elastic Operator correctly in a Kafka connector? - Stack Overflow
Hope it helps future people! :smiley:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.