Exception when using Elasticsearch-spark and Elasticsearch-core together

Hello, Elastic users.

I am trying to use Elasticsearch-spark and Elasticsearch-core together for my scala(sbt) spark app.

here is the build.sbt.

scalaVersion := "2.10.6"

val sparkVers = "1.5.1"

resolvers ++= Seq(
  "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots")

// Base Spark-provided dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVers % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVers % "provided")

// Elasticsearch integration
libraryDependencies ++= Seq(
  ("org.elasticsearch" % "elasticsearch-spark_2.10" % "2.2.0-beta1").
    exclude("org.apache.hadoop", "hadoop-yarn-api").
    exclude("org.eclipse.jetty.orbit", "javax.mail.glassfish").
    exclude("org.eclipse.jetty.orbit", "javax.servlet").
    exclude("org.slf4j", "slf4j-api")
)

// Elasticsearch core
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.1.1"

// Skip tests when assembling fat JAR
test in assembly := {}

// Exclude jars that conflict with Spark (see https://github.com/sbt/sbt-assembly)
libraryDependencies ~= { _ map {
  case m if Seq("org.elasticsearch").contains(m.organization) =>
    m.exclude("commons-logging", "commons-logging").
      exclude("commons-collections", "commons-collections").
      exclude("commons-beanutils", "commons-beanutils-core").
      exclude("com.esotericsoftware.minlog", "minlog").
      exclude("joda-time", "joda-time").
      exclude("org.apache.commons", "commons-lang3").
      exclude("com.google.guava", "guava")
  case m => m
}}

dependencyOverrides += "org.scala-lang" % "scala-compiler" % scalaVersion.value
dependencyOverrides += "org.scala-lang" % "scala-library" % scalaVersion.value
dependencyOverrides += "commons-net" % "commons-net" % "3.1"

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

And the following is my very simple main class.

object Main {

  def main(args: Array[String]): Unit = {
    val settings = Settings.settingsBuilder.put("cluster.name", "Avengers").put("client.transport.sniff", true).build
    TransportClient.builder.settings(settings).build.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300))
    
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    
    sc.esJsonRDD("*")
      .collect()
      .foreach(println)

  sc.stop()
  }
}

If I make this as a fat jar (sbt assembly) and spark-submit it to the spark cluster, I get the following exception.

Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableMap
	at Main$.main(Main.scala:13)
	at Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.ImmutableMap
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 11 more

However, without the following lines, everything works fine.

val settings = Settings.settingsBuilder.put("cluster.name", "Avengers").put("client.transport.sniff", true).build
TransportClient.builder.settings(settings).build.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300))

Does anyone confront any similar problem?
I need to use TransportClient in order to manage index and cluster while MapReducing with esRDD.

Is it even possible to use Elasticsearch-spark and Elasticsearch-core together?

Any hint would be very appreciated.
Thank you.

Did you try using the elasticsearch-hadoop development snapshot ?

1 Like

Es-spark/Hadoop does not depend on Guava. Hadoop does as does Elastic. However Hadoop depends on an ancient version of Guava while ES does not. Try upgrading Guava to the same version as elastic or, if that doesn't work, try shading it (see the elastic blog site).

1 Like

Thank you for your hints.

I solved this issue by adding

exclude("org.apache.spark", "spark-network-common_2.10")

instead of

exclude("com.google.guava", "guava")

It looks like spark-network-common_2.10 contains guava classes.
So the finally build.sbt includes the following...

// Exclude jars that conflict with Spark (see https://github.com/sbt/sbt-assembly)
libraryDependencies ~= { _ map {
  case m if Seq("org.elasticsearch").contains(m.organization) =>
    m.exclude("commons-logging", "commons-logging").
      exclude("commons-collections", "commons-collections").
      exclude("commons-beanutils", "commons-beanutils-core").
      exclude("com.esotericsoftware.minlog", "minlog").
      exclude("joda-time", "joda-time").
      exclude("org.apache.commons", "commons-lang3").
      exclude("org.apache.spark", "spark-network-common_2.10")
  case m => m
}}

Thanks for the update.

If you have time, can you raise this with the Spark team - having unshaded google guava classes in a different jar is bad practice since it's hard to track it down. From what I can tell, the jar itself has some com.google.common classes internally (bad) and a dependency on guava.

2 Likes