[Reactor] Parallel Streams causes APM log spam and possibly missed spans

APM Agent language and version: Java Agent 1.51.0

Original install method (e.g. download page, yum, deb, from source, etc.) and version:

Manually via curl and run with -javaagent. It is also separately used within a Docker container where it uses the documented Docker method to run with the -javaagent.

Fresh install or upgraded from other version?

Problem also happened in 1.49.0.

Is there anything special in your setup?

Not with the way data flows to APM.

Description of the problem including expected versus actual behavior:

There should be no errors indicating that a Span::end has already been called by automatically generated handling.

Steps to reproduce:

Based on an attempt to reproduce this without Kafka, it appears that this problem relates specifically to reactor-kafka (as opposed to reactor-core).

  1. With the Reactor framework, use a .metrics().parallel().runOn(...) and therefore scheduled pipeline.
    • Removing .parallel().runOn(...) resolves the issue
  2. Observe automatic APM transaction generation results in log spam

Provide logs and/or server output (if relevant):

These two logs are repeated over-and-over:

2024-09-19 14:34:39,512 [reactive-kafka-indexer-2] WARN  co.elastic.apm.agent.impl.transaction.AbstractSpanImpl - End has already been called: 'Kafka record from metrics' 00-43a0fae3700192c45c4e3b66d9ab7fc6-b97bfcb6a5952a35-01 (1427a3aa)
2024-09-19 14:34:39,614 [reactive-kafka-indexer-2] ERROR co.elastic.apm.agent.bci.IndyBootstrap - Advice threw an exception, this should never happen!
java.util.NoSuchElementException: null
	at java.util.ArrayDeque.removeFirst(ArrayDeque.java:361) ~[?:?]
	at java.util.ArrayDeque.remove(ArrayDeque.java:522) ~[?:?]
	at co.elastic.apm.agent.impl.ActiveStack.deactivate(ActiveStack.java:119) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.impl.ElasticApmTracer.deactivate(ElasticApmTracer.java:940) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.impl.transaction.TraceStateImpl.deactivate(TraceStateImpl.java:71) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.impl.transaction.TraceStateImpl.deactivate(TraceStateImpl.java:33) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at co.elastic.apm.agent.concurrent.RunnableCallableForkJoinTaskInstrumentation$AdviceClass.onExit(RunnableCallableForkJoinTaskInstrumentation.java:83) ~[elastic-apm-agent-1.51.0.jar:1.51.0]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:87) ~[reactor-core-3.5.11.jar:3.5.11]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.5.11.jar:3.5.11]
	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1570) [?:?]

Can you try the artifact in this snapshot, eg this agent jar and see if the functionality is what you want?

Hey @Jack_Shirazi, this changes the behavior and avoids the stacktrace, but it has introduced a new warning:

2024-10-07 11:30:30,125 [reactive-kafka-indexer-2] WARN  co.elastic.apm.agent.impl.transaction.AbstractSpanImpl - End has already been called: 'Kafka record from metrics' 00-267240554c7e5f88ff830484f04d42b3-37d4763c1ab58522-01 (691e56cb)
2024-10-07 11:30:30,226 [reactive-kafka-indexer-2] WARN  co.elastic.apm.agent.impl.ActiveStack - Deactivating a context ('Kafka record from metrics' 00-267240554c7e5f88ff830484f04d42b3-37d4763c1ab58522-01 (691e56cb)) which is not the currently active one (co.elastic.apm.agent.impl.EmptyTraceState@213815fd). This can happen when not properly deactivating a previous span or context.

Thanks, I kind of expected that, since the only reason the active stack could be misaligned would be somewhere in the reactor-kafka implementation propagation is not happening correctly. Is it functionally working for you? I'll open a low priority issue for this but unless there's a functional problem it'll stay like this for a while

The work being done is happening correctly, but it's hard to say if APM is reporting correctly because we have to aggressively sample the data, so it has a lot of holes in it on its own.

Well if you identify something more, please update the issue