I am trying to develop an Kafka Connect Transform for the Confluent's Elasticsearch Sink connector
The transform has to take a preexisting index document data and insert it as a field to the current document.
I want to use High level Rest client to make an es query inside the transform and append the field back to the record.
Built using maven. I wrote the transform. Written unit tests also which do the same above thing. It. returns successful.
I need to add this to my Kafka-connect docker which also has the es sink connector.
- When I use this jar directly and run a connector.. the connector gives me this error. So I thought the jar is not a fat jar/uber jar .. so there is some dependency problem
java.lang.NoClassDefFoundError: org/elasticsearch/index/query/QueryBuilder
at io.mosip.kafka.connect.transforms.DynamicNewField.configure(DynamicNewField.java:184)
at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:284)
at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:631)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1287)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$17(DistributedHerder.java:1300)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.index.query.QueryBuilder
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more
- So I made a fat jar using maven-shade-plugin.. but when I run the connector with this transform, it gives this error
java.util.ServiceConfigurationError: org.elasticsearch.plugins.spi.NamedXContentProvider: org.elasticsearch.search.aggregations.matrix.spi.MatrixStatsNamedXContentProvider not a subtype
at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1237)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
at org.elasticsearch.client.RestHighLevelClient.getProvidedNamedXContents(RestHighLevelClient.java:1980)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:302)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:287)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:279)
at io.mosip.kafka.connect.transforms.DynamicNewField$ESQueryConfig.<init>(DynamicNewField.java:77)
at io.mosip.kafka.connect.transforms.DynamicNewField.configure(DynamicNewField.java:184)
at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:284)
at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:631)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1287)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$17(DistributedHerder.java:1300)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
- Is something wrong with my uberjar creation?
- My unit tests are successful .. so I wrote a main class which execute same stuff as the tests .. and ran it using
java -cp <path> ..kafka.connect.transforms.Main
and it works also .. did the same inside the docker also still works - It is only not working when used as transform with the connector
- I removed the uberjar creation went back to normal jar but this time I added all the dependency jar files (using
mvn dependency:copy-dependencies
) in the docker in the same folder. Exported CLASSPATH. And reran my connector .. still gives the same error.. also reran my dummy Main class .. it works here also
Help! Thanks!
Here is the pom.xml for my transform:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.mosip</groupId>
<artifactId>dynamic_new</artifactId>
<version>0.1.0</version>
<name>dynamic_new</name>
<url>https://mosip.io</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>hidden.org.apache.http</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.logging</pattern>
<shadedPattern>hidden.org.apache.logging</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.codec</pattern>
<shadedPattern>hidden.org.apache.commons.codec</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.logging</pattern>
<shadedPattern>hidden.org.apache.commons.logging</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.MF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Here is my Dockerfile:
FROM confluentinc/cp-kafka-connect
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.2
ADD ./jars /usr/share/java/my_transforms
git repo for the transform and the docker etc: https://github.com/lalithkota/mosip-infra/tree/1.2.0_v3/build/es-kafka-connecter