Job fails injecting dataframe with variables in index name

Hello
I have this code that injects a dataframe to an elastic cluster 7.9.3

myDataframe.saveToEs("customer-{year}.{month}")

But i'm getting this error :

User class threw exception: java.lang.Exception: Error(s) during treatment of stream data org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 1148, node06, executor 1): org.apache.spark.util.TaskCompletionListenerException: null
null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

It starts injecting data but afert 2 mins it fails with this error,

But if i replace by :
myDataframe.saveToEs("customer-2023.8")
It wortks

And if i put again :
myDataframe.saveToEs("customer-{year}.{month}")
With superuser it works

But i can't let the superuser

Can any one help ?
Thanks

Found a fix
The issue was that using org.elasticsearch:elasticsearch-spark-20_2.11:8.8.22 to send data
But while looking at sent requests with wireshark there was a :
PUT /_bulk -> ok it was my data sent and this request has been ACK

But just after there iwa a :
POST /_refresh with this status : [FIN,ACK]

This means that the client ended the connection

So i tried this POST _refresh in postman and indeed this request returns 403 Forbidden
because this request was trying to access system indices : .transform .security-07

Finally i added this param to my spark session :

.config(ES_BATCH_WRITE_REFRESH, "false")

And it worked, but i don't know if it was the correct way to fix the problem
and also why

  • suddenly it started to send this refresh
  • or if suddenly the refresh was accessing those system indices ?

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.