I am trying to use elasticsearch java client with spark streaming. I am using the java client to read data for a given document id.
Here the code:
class ESConnection(createEsClient: () => TransportClient) extends Serializable {
lazy val esClient = createEsClient()
def client(_index: String, _type: String, id: String): GetResponse = esClient.prepareGet(_index, _type, id).get()
}
object ESConnection {
def apply(): ESConnection = {
val f = () => {
val settings = Settings.builder()
.put("cluster.name", "swlm-elasticsearch-stage")
.build();
val client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.66.73.182"), 9300))
sys.addShutdownHook {
client.close()
}
client
}
new ESConnection(f)
}
}
I get the below two error. How can i use java api to search for data with spark streaming job
java.lang.ExceptionInInitializerError
at org.elasticsearch.common.logging.DeprecationLogger.(DeprecationLogger.java:138)
at org.elasticsearch.common.xcontent.support.AbstractXContentParser.(AbstractXContentParser.java:57)
at org.elasticsearch.common.xcontent.json.JsonXContentParser.(JsonXContentParser.java:44)
at org.elasticsearch.common.xcontent.json.JsonXContent.createParser(JsonXContent.java:103)
at org.elasticsearch.common.settings.Setting.parseableStringToList(Setting.java:832)
at org.elasticsearch.common.settings.Setting.lambda$listSetting$27(Setting.java:786)
at org.elasticsearch.common.settings.Setting.listSetting(Setting.java:791)
at org.elasticsearch.common.settings.Setting.listSetting(Setting.java:786)
at org.elasticsearch.common.network.NetworkService.(NetworkService.java:50)
at org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:98)
at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:126)
at org.elasticsearch.client.transport.TransportClient.(TransportClient.java:268)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:125)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:111)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:101)
at com.tgt.ESConnection$$anonfun$1.apply(ESConnection.scala:31)
at com.tgt.ESConnection$$anonfun$1.apply(ESConnection.scala:23)
at com.tgt.ESConnection.esClient$lzycompute(ESConnection.scala:15)
at com.tgt.ESConnection.esClient(ESConnection.scala:15)
at com.tgt.ESConnection.client(ESConnection.scala:17)
at com.tgt.WlmAggs$$anonfun$main$1$$anonfun$apply$1.apply(WlmAggs.scala:105)
at com.tgt.WlmAggs$$anonfun$main$1$$anonfun$apply$1.apply(WlmAggs.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Error finding the build shortHash. Stopping Elasticsearch now so it doesn't run in subtly broken ways. This is likely a build bug.
at org.elasticsearch.Build.(Build.java:62)
... 34 more
17/04/07 15:38:46 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 19)
java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.common.network.NetworkService
at org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:98)
at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:126)
at org.elasticsearch.client.transport.TransportClient.(TransportClient.java:268)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:125)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:111)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:101)
at com.tgt.ESConnection$$anonfun$1.apply(ESConnection.scala:31)
at com.tgt.ESConnection$$anonfun$1.apply(ESConnection.scala:23)
at com.tgt.ESConnection.esClient$lzycompute(ESConnection.scala:15)
at com.tgt.ESConnection.esClient(ESConnection.scala:15)
at com.tgt.ESConnection.client(ESConnection.scala:17)
at com.tgt.WlmAggs$$anonfun$main$1$$anonfun$apply$1.apply(WlmAggs.scala:105)
at com.tgt.WlmAggs$$anonfun$main$1$$anonfun$apply$1.apply(WlmAggs.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)