Hello, Elastic users.
I am trying to use Elasticsearch-spark and Elasticsearch-core together for my scala(sbt) spark app.
here is the build.sbt.
scalaVersion := "2.10.6"
val sparkVers = "1.5.1"
resolvers ++= Seq(
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"Akka Repository" at "http://repo.akka.io/releases/",
"Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots")
// Base Spark-provided dependencies
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVers % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVers % "provided")
// Elasticsearch integration
libraryDependencies ++= Seq(
("org.elasticsearch" % "elasticsearch-spark_2.10" % "2.2.0-beta1").
exclude("org.apache.hadoop", "hadoop-yarn-api").
exclude("org.eclipse.jetty.orbit", "javax.mail.glassfish").
exclude("org.eclipse.jetty.orbit", "javax.servlet").
exclude("org.slf4j", "slf4j-api")
)
// Elasticsearch core
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.1.1"
// Skip tests when assembling fat JAR
test in assembly := {}
// Exclude jars that conflict with Spark (see https://github.com/sbt/sbt-assembly)
libraryDependencies ~= { _ map {
case m if Seq("org.elasticsearch").contains(m.organization) =>
m.exclude("commons-logging", "commons-logging").
exclude("commons-collections", "commons-collections").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("com.esotericsoftware.minlog", "minlog").
exclude("joda-time", "joda-time").
exclude("org.apache.commons", "commons-lang3").
exclude("com.google.guava", "guava")
case m => m
}}
dependencyOverrides += "org.scala-lang" % "scala-compiler" % scalaVersion.value
dependencyOverrides += "org.scala-lang" % "scala-library" % scalaVersion.value
dependencyOverrides += "commons-net" % "commons-net" % "3.1"
assemblyMergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
And the following is my very simple main class.
object Main {
def main(args: Array[String]): Unit = {
val settings = Settings.settingsBuilder.put("cluster.name", "Avengers").put("client.transport.sniff", true).build
TransportClient.builder.settings(settings).build.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300))
val conf = new SparkConf()
val sc = new SparkContext(conf)
sc.esJsonRDD("*")
.collect()
.foreach(println)
sc.stop()
}
}
If I make this as a fat jar (sbt assembly) and spark-submit it to the spark cluster, I get the following exception.
Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableMap
at Main$.main(Main.scala:13)
at Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.ImmutableMap
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
However, without the following lines, everything works fine.
val settings = Settings.settingsBuilder.put("cluster.name", "Avengers").put("client.transport.sniff", true).build
TransportClient.builder.settings(settings).build.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300))
Does anyone confront any similar problem?
I need to use TransportClient in order to manage index and cluster while MapReducing with esRDD.
Is it even possible to use Elasticsearch-spark and Elasticsearch-core together?
Any hint would be very appreciated.
Thank you.