[Reactor] Parallel Streams causes APM log spam and possibly missed spans

APM Agent language and version: Java Agent 1.51.0

Original install method (e.g. download page, yum, deb, from source, etc.) and version:

Manually via curl and run with -javaagent. It is also separately used within a Docker container where it uses the documented Docker method to run with the -javaagent.

Fresh install or upgraded from other version?

Problem also happened in 1.49.0.

Is there anything special in your setup?

Not with the way data flows to APM.

Description of the problem including expected versus actual behavior:

There should be no errors indicating that a Span::end has already been called by automatically generated handling.

Steps to reproduce:

Based on an attempt to reproduce this without Kafka, it appears that this problem relates specifically to reactor-kafka (as opposed to reactor-core).

  1. With the Reactor framework, use a .metrics().parallel().runOn(...) and therefore scheduled pipeline.
    • Removing .parallel().runOn(...) resolves the issue
  2. Observe automatic APM transaction generation results in log spam

Provide logs and/or server output (if relevant):

These two logs are repeated over-and-over:

2024-09-19 14:34:39,512 [reactive-kafka-indexer-2] WARN  co.elastic.apm.agent.impl.transaction.AbstractSpanImpl - End has already been called: 'Kafka record from metrics' 00-43a0fae3700192c45c4e3b66d9ab7fc6-b97bfcb6a5952a35-01 (1427a3aa)
2024-09-19 14:34:39,614 [reactive-kafka-indexer-2] ERROR co.elastic.apm.agent.bci.IndyBootstrap - Advice threw an exception, this should never happen!
java.util.NoSuchElementException: null
	at java.util.ArrayDeque.removeFirst(ArrayDeque.java:361) ~[?:?]
	at java.util.ArrayDeque.remove(ArrayDeque.java:522) ~[?:?]
	at co.elastic.apm.agent.impl.ActiveStack.deactivate(ActiveStack.java:119) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.impl.ElasticApmTracer.deactivate(ElasticApmTracer.java:940) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.impl.transaction.TraceStateImpl.deactivate(TraceStateImpl.java:71) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.impl.transaction.TraceStateImpl.deactivate(TraceStateImpl.java:33) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.concurrent.RunnableCallableForkJoinTaskInstrumentation$AdviceClass.onExit(RunnableCallableForkJoinTaskInstrumentation.java:83) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:87) ~[reactor-core-3.5.11.jar:3.5.11]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.5.11.jar:3.5.11]
	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1570) [?:?]

Can you try the artifact in this snapshot, eg this agent jar and see if the functionality is what you want?

Hey @Jack_Shirazi, this changes the behavior and avoids the stacktrace, but it has introduced a new warning:

2024-10-07 11:30:30,125 [reactive-kafka-indexer-2] WARN  co.elastic.apm.agent.impl.transaction.AbstractSpanImpl - End has already been called: 'Kafka record from metrics' 00-267240554c7e5f88ff830484f04d42b3-37d4763c1ab58522-01 (691e56cb)
2024-10-07 11:30:30,226 [reactive-kafka-indexer-2] WARN  co.elastic.apm.agent.impl.ActiveStack - Deactivating a context ('Kafka record from metrics' 00-267240554c7e5f88ff830484f04d42b3-37d4763c1ab58522-01 (691e56cb)) which is not the currently active one (co.elastic.apm.agent.impl.EmptyTraceState@213815fd). This can happen when not properly deactivating a previous span or context.