Save Spark Dataframe into ES - Can't handle type exception


(eliasah) #1

I have designed a simple job to read data from MySQL and save it in Elasticsearch with Spark.

Here is the code :

    JavaSparkContext sc = new JavaSparkContext(
            new SparkConf().setAppName("MySQLtoEs")
                    .set("es.index.auto.create", "true")
                    .set("es.nodes", "127.0.0.1:9200")
                    .set("es.mapping.id", "id")
                    .set("spark.serializer", KryoSerializer.class.getName()));

    SQLContext sqlContext = new SQLContext(sc);

    // Data source options
    Map<String, String> options = new HashMap<>();
    options.put("driver", MYSQL_DRIVER);
    options.put("url", MYSQL_CONNECTION_URL);
    options.put("dbtable", "OFFERS");
    options.put("partitionColumn", "id");
    options.put("lowerBound", "10001");
    options.put("upperBound", "499999");
    options.put("numPartitions", "10");

    // Load MySQL query result as DataFrame
    LOGGER.info("Loading DataFrame");
    DataFrame jdbcDF = sqlContext.load("jdbc", options);
    DataFrame df = jdbcDF.select("id", "title", "description",
            "merchantId", "price", "keywords", "brandId", "categoryId");
    df.show();
    LOGGER.info("df.count : " + df.count());
    EsSparkSQL.saveToEs(df, "offers/product");

You can see that the code is very straight forward, it just reads the data into a DataFrame, selects some columns and then count as a basic action on the Dataframe. Till here everything works fine!

Then it tries to save the data into Elasticsearch, but it fails because it cannot handle some type. You can find the error log here.

I'm not sure about why it can't handle that type.

I'm using Apache Spark 1.5.0, Elasticsearch 1.4.4 and elaticsearch-hadoop 2.1.1

Thanks in advance for you help!


(Costin Leau) #2

Spark 1.5.0 had some internal changes; please try the dev builds (those from 2.1.x should work just fine).

P.S. Don't embed JS scripts but rather links. The former are not executed for security reasons.


(eliasah) #3

Thanks for you reply! I have tried what you have suggested but it's still giving me the same exact error stack.

I have also check the jar to make sure it's take the correct dependency :

$> mvn dependency:list | grep elasticsearch
[INFO]    org.elasticsearch:elasticsearch-hadoop:jar:2.1.2.BUILD-SNAPSHOT:compile
[INFO]    org.elasticsearch:elasticsearch:jar:1.4.4:compile

PS : I have updated the embed JS replacing it with the link toward the error logs.


(Costin Leau) #4

Looks like a new internal type was introduced in Spark 1.5. Raised an issue for it here

P.S. The class loading exception is quite weird - likely it occurs because the underlying classloader in spark is being closed.


(eliasah) #5

Ok, so there is actually no solution for now unless the next connector update.

Thanks anyway! :smile:


(Costin Leau) #6

There's something else at hand - after quickly looking at the code, the connector actually recognizes the type but it is unclear why it doesn't properly detect it in your case.


(Costin Leau) #7

There's something wrong here.
Can you post your classpath? The class itself should be recognized but it is not and the classnotfound exception might indicate there might be multiple versions of spark and elasticsearch-spark in your classpath which will cause classes to be not recognized.
Namely if bytecode A is loaded by ClassLoader X and Y, code in X will not recognized A loaded from Y and vice-versa.
Which might be caused by how the serialization work - namely a spark row might be transferred over the wire but loaded in a different classpath with a different spark version which could cause the value/object to not be recognized.

P.S Can you please reproduce the problem on a local install by any chance? It would be great to be able to reproduce this.


(eliasah) #8

I'm not sure about which classpath you are talking about, I'm running the application on a local standalone cluster

$> env | grep SPARK
SPARK_HOME=/home/eliasah/utils/spark/spark-1.5.0-bin-hadoop2.6/

Elasticsearch is running as a service on my local machine as well as you can see in the SparkConf settings.
MySQL is also running in local.

Here are snippets from my pom.xml

    <properties>
       <maven.compiler.source>1.8</maven.compiler.source>
       <maven.compiler.target>1.8</maven.compiler.target>
       <encoding>UTF-8</encoding>
       <scala.version>2.10.4</scala.version>
       <scala.binary.version>2.10</scala.binary.version>
       <spark.version>1.5.0</spark.version>
    </properties>

    [ ... ]
    
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.35</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>2.1.2.BUILD-SNAPSHOT</version>
    </dependency>

(Costin Leau) #9

Can you then please extract a sample of the DataFrame from SQL save it to disk in a simple portable format (CSV or something) and put it in a test that replicates the problem and upload it somewhere.
2-3 likes are okay - I just need to have a basic sample that raises the exception, the smaller it is the better.


(eliasah) #10

I have extracted the first 30 rows of the Dataframe which you can find here along with the error log file and the java source code.


(eliasah) #11

I have found the solution which I post on SO as well. Here is the link to question and answer!

Thanks for your help @costin! :smile:

PS: You can tell me what you think about the answer as well!


(Costin Leau) #12

Thanks for the update @elisah. I think there's something we can improve and support in es-spark directly.


(eliasah) #13

It would be great. Thanks! :smile:


(Costin Leau) #14

I've been reading that thread several times and even tried replicating the issue based on your dataset but I can't.
Initially I thought a new type was added in Spark 1.5 but it turned out that is not the case. They changed the traits and the internal structure but that shouldn't be an issue.
In fact if you use the nightly build things should work just fine.
The solution you used should be needed and it indicates the issue is still there - you basically convert the DataFrame into an RDD but since it contains a rich object (Product) you need to serialize it.

You are basically recreating the DataFrame by designing your own RDD with a schema on top and this shouldn't be needed.
Can you please try the latest 2.2.0.BUILD-SNAPSHOT with your initial setup and report back? It contains some small fixes which might fix the glitches between Spark 1.5 and the previous versions.


(eliasah) #15

I totally understand that, but there is a de-serialization problem somewhere when writing a DataFrame to es.

I've also tried to use the latest build, without success. It gives me the same issue whether the solution I described in my answer on SO works perfectly.


(Costin Leau) #16

Indeed but I can't seem to find it. I really don't want to avoid the "can't reproduce" canned response but I keep trying various variations, including the Kryo serializations to no avail.
Note that the integration tests (as seen here for example) contains plenty of Data Frames created either manually or from the existing storage (which is quite complicated) and in all cases, it just works.

If the serialization occurs, it's likely because there a class mismatch (a different class loader is used or a different version of spark is loaded somehow). Which makes it even more weird if you are running things locally (since no serialization should occur then).

Can you by any chance try the spark console along with ES and see whether the problem occurs? Does it occur even if you are creating the DataFrame by hand or from a simple json file (basically directly from Spark proper without any addon or plugin)?

Thanks,


(eliasah) #17

I have a lead of the de-serialization problem, but I'm not sure if I should open an issue about it.

It seems to be related to the DataFrameValueWriter unknown type handling with DecimalType. As a matter of fact the error log is misleading and the EsHadoopSerializationException thrown here is never given.

Instead the error shows the following log :

2015-10-29 10:41:24 ERROR TaskResultGetter:75 - Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@59e5ddf
2015-10-29 10:41:24 WARN  ThrowableSerializationWrapper:166 - Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  [...]

The error log is quite long, but I can send it to you in case you need it.


(Costin Leau) #18

I fail to see the relationship between the two. Are you using a DecimalType by any chance? If the writer fails the exception will bubble up but instead the typical (Cannot handle type) exception is being returned instead which indicates that the unknown type / Decimal case is not executed.


(Costin Leau) #19

I stand corrected. You are using a DecimalType and likely the exception does not occur since the class comparison does not match.
I'll look into it - at the very least the exception should be properly triggered.


(eliasah) #20

Great! Thanks you :slight_smile: