Bulk Request java.lang.InterruptedException concurrency issue

Hello,

Context:
We have a simple Flink job (v1.11.2 in Scala) from Kafka to ES using the Table api with just 2 joins before writing to ES. It uses the Flink Elasticsearch Connector which under the hood uses the Java ES SDK BulkProcessor.

Error:
We continuously receive these java.lang.InterruptedException errors killing our job which is a showstopper for us. Exception always seems to either get thrown at latch.await() [link] or at semaphore.aquire() [link]. Retrying usually succeeds and then crashes on a later document. This indicates to us that it's some sort of concurrency issue. Flink uses Elasticsearch SDK version 7.5.1. We tried manually replacing with 7.9 but received the same issue.

We dug in a bit more and found https://github.com/elastic/elasticsearch/issues/36782 which seems to indicate that this was failing tests at one point and then stopped happening.

We really need help getting past this error. An error stack trace will copied below. We have exceeded char limit.

Thanks

    2020.10.27 23:52:01 INFO  23:52:01,744 INFO  org.elasticsearch.action.bulk.BulkRequestHandler             [] - Bulk request 13391 has been cancelled.
    2020.10.27 23:52:01 INFO  java.lang.InterruptedException: null
    2020.10.27 23:52:01 INFO  	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040) ~[?:?]
    2020.10.27 23:52:01 INFO  	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) ~[?:?]
    2020.10.27 23:52:01 INFO  	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232) ~[?:?]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:347) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7BulkProcessorIndexer.add(Elasticsearch7BulkProcessorIndexer.java:72) [flink-connector-elasticsearch7_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at com.remind.graph.Job$$anon$13.process(Job.scala:312) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
    2020.10.27 23:52:01 INFO  	at com.remind.graph.Job$$anon$13.process(Job.scala:286) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310) [flink-connector-elasticsearch-base_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at SinkConversion$136.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at StreamExecCalc$131.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:287) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at java.lang.Thread.run(Thread.java:834) [?:?]
    2020.10.27 23:52:01 INFO  23:52:01,745 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase [] - Failed Elasticsearch bulk request: null
    2020.10.27 23:52:01 INFO  java.lang.InterruptedException: null
    2020.10.27 23:52:01 INFO  	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040) ~[?:?]
    2020.10.27 23:52:01 INFO  	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) ~[?:?]
    2020.10.27 23:52:01 INFO  	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232) ~[?:?]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:347) [elasticsearch-7.5.1.jar:7.5.1]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7BulkProcessorIndexer.add(Elasticsearch7BulkProcessorIndexer.java:72) [flink-connector-elasticsearch7_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at com.remind.graph.Job$$anon$13.process(Job.scala:312) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
    2020.10.27 23:52:01 INFO  	at com.remind.graph.Job$$anon$13.process(Job.scala:286) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310) [flink-connector-elasticsearch-base_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at 

tbc

org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at SinkConversion$136.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at StreamExecCalc$131.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:287) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.12-1.11.2.jar:1.11.2]
    2020.10.27 23:52:01 INFO  	at java.lang.Thread.run(Thread.java:834) [?:?]

That looks totally unrelated -- the issue you link was a (benign) race condition when shutting down a node, whereas you are experiencing a problem in your client.

java.lang.InterruptedException means that someone called Thread#interrupt() on the thread in question. AFAIK the only time the Elasticsearch client does so is when it is closing. Therefore either you are closing your Client while the bulk requests are still running, or else something else in your client is interrupting this thread.

1 Like

I think we traced it down to Flink killing the client. Thanks for the help.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.