Context:
We have a simple Flink job (v1.11.2 in Scala) from Kafka to ES using the Table api with just 2 joins before writing to ES. It uses the Flink Elasticsearch Connector which under the hood uses the Java ES SDK BulkProcessor.
Error:
We continuously receive these java.lang.InterruptedException errors killing our job which is a showstopper for us. Exception always seems to either get thrown at latch.await() [link] or at semaphore.aquire() [link]. Retrying usually succeeds and then crashes on a later document. This indicates to us that it's some sort of concurrency issue. Flink uses Elasticsearch SDK version 7.5.1. We tried manually replacing with 7.9 but received the same issue.
2020.10.27 23:52:01 INFO 23:52:01,744 INFO org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 13391 has been cancelled.
2020.10.27 23:52:01 INFO java.lang.InterruptedException: null
2020.10.27 23:52:01 INFO at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040) ~[?:?]
2020.10.27 23:52:01 INFO at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) ~[?:?]
2020.10.27 23:52:01 INFO at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232) ~[?:?]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:347) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7BulkProcessorIndexer.add(Elasticsearch7BulkProcessorIndexer.java:72) [flink-connector-elasticsearch7_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at com.remind.graph.Job$$anon$13.process(Job.scala:312) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
2020.10.27 23:52:01 INFO at com.remind.graph.Job$$anon$13.process(Job.scala:286) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310) [flink-connector-elasticsearch-base_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at SinkConversion$136.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at StreamExecCalc$131.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:287) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at java.lang.Thread.run(Thread.java:834) [?:?]
2020.10.27 23:52:01 INFO 23:52:01,745 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase [] - Failed Elasticsearch bulk request: null
2020.10.27 23:52:01 INFO java.lang.InterruptedException: null
2020.10.27 23:52:01 INFO at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040) ~[?:?]
2020.10.27 23:52:01 INFO at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) ~[?:?]
2020.10.27 23:52:01 INFO at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232) ~[?:?]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:347) [elasticsearch-7.5.1.jar:7.5.1]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7BulkProcessorIndexer.add(Elasticsearch7BulkProcessorIndexer.java:72) [flink-connector-elasticsearch7_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at com.remind.graph.Job$$anon$13.process(Job.scala:312) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
2020.10.27 23:52:01 INFO at com.remind.graph.Job$$anon$13.process(Job.scala:286) [classes-Metals-Vx9exDKYQoitgbxsINrlCA%3d%3d/:?]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310) [flink-connector-elasticsearch-base_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at SinkConversion$136.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at StreamExecCalc$131.processElement(Unknown Source) [flink-table-planner-blink_2.12-1.11.2.jar:?]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:287) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120) [flink-table-runtime-blink_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) [flink-streaming-java_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.12-1.11.2.jar:1.11.2]
2020.10.27 23:52:01 INFO at java.lang.Thread.run(Thread.java:834) [?:?]
That looks totally unrelated -- the issue you link was a (benign) race condition when shutting down a node, whereas you are experiencing a problem in your client.
java.lang.InterruptedException means that someone called Thread#interrupt() on the thread in question. AFAIK the only time the Elasticsearch client does so is when it is closing. Therefore either you are closing your Client while the bulk requests are still running, or else something else in your client is interrupting this thread.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.