Write stream to elasticsearch failure

Hello.
i tried to stream Process pipline with pyspark that read data from kafka topic and write it to Elasticsearch database when i running it in ubuntu environment with python3.10 stream.py ,it is working well and write stream data to elastic but when i try to automate my job with crontab or remote ssh airflow i got below error.

Elasticsearch==8.14.0
spark==3.4.1
scala==2.12

ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)`java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)' WARNING - at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.<init>(EsStreamQueryWriter.scala:50) WARNING - at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.$anonfun$addBatch$5(EsSparkSqlStreamingSink.scala:72) WARNING - at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)

My guess is that your es-spark jar is incompatible with the version of spark that you are using. Both spark and scala pretty frequently break compatibility with upgrades. What version of es-spark are you using? For the versions you give, you'll want elasticsearch-spark-30_2.12-8.14.0.jar.

1 Like

Also, are you sure you're not accidentally using spark 3.5, or picking up spark 3.5 libraries in your classpath somewhere? Spark 3.5 is not supported yet. It looks like that method in your stack trace is present in spark 3.4. But it is gone from 3.5.

Thanks for your answer.
i use exactly org.elasticsearch_elasticsearch-spark-30_2.12-8.14.0.jar as my driver

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/

Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.23)
    .appName("spark") \
    .config("spark.sql.legacy.charVarcharAsString", "true") \
    .config("spark.jars", "/home/hadoop/W/org.elasticsearch_elasticsearch-spark-30_2.12-8.14.0.jar") \
    .config("spark.jars", "/home/hadoop/W/spark-catalyst_2.12-3.4.1.jar") \
    .config("spark.jars", "/home/hadoop/W/elasticsearch-hadoop-8.14.0.jar") \
    .config("spark.jars", "/home/hadoop/W/spark-sql_2.12-3.4.1.jar") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,org.apache.kafka:kafka-clients:3.4.1") \
    .getOrCreate()```

You don't ordinarily have to pass in the spark sql and catalyst jars, do you? Maybe something is not quite right in the environment? It is the same error we get when we run in spark 3.5 and org.apache.spark.sql.catalyst.encoders.RowEncoder is not present. You could try enabling verbose class loading to get more information.

Thank you very much.
my problem is solved.The reason was the difference between environment variables between Ubuntu (can see this differences with env command) and the airflow environment . i created a .sh file and added these differences to this bash script .

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.