APM - KafkaProducer is not instrumented

Environment:
Java 11
APM agent: v1.36.0
Apache Camel 3.20.1
Kafka client v3.1.1

Scenario:
I have a small Apache Camel with Kafka integration. There is a consumer, reading data from one Kafka cluster, writing the same data into another Kafka cluster. Incoming message is simple Kafka message, with some payload and no headers. Application is instrumented with Elastic APM Java Agent.

Expectation:
Incoming message from Kafka consumer is catched, new trace.id and span.id created (since there is no traceparent in headers), tracing information is then injected into Kafka Headers as traceparent or elasticapmtraceparent Kafka header. Instrumented producer then reuses same trace.id but creates new span.id.

Reality:
Kafka message comes in, APM catches a message from KafkaConsumer, starts new transaction, writes it into APM server and closes. Information is not propagated any further. KafkaProducer is not instrumented at all. I am left with KafkaConsumer trace.

Scenario 2:
Same as above but Kafka header "elasticapmtraceparent" exists. APM for KafkaConsumer correctly reuses trace.id and creates new span.id. Header is then propagated to the KafkaPRoducer but it is still not instrumented. Only consumer trace is stored in APM server.

Pseudocode:

.from(kafka(topicA/serverA))
.to(kafka(topicB/serverB))

Q1:
Should APM inject elasticapmtraceparent into Kafka Header when received by KafkaConsumer or is it my job?
Q2:
How can I access the newly generated trace.id and span.id so I can manually modify KAfkaHeaders myself?
Q3:
Why is not KafkaProducer instrumented at all? Does it have something to do with Apache Camel framework?

Logs:

2023-02-16 13:47:24,190 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.transaction.AbstractSpan - increment references to '' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab) (1)
2023-02-16 13:47:24,198 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ElasticApmTracer - startTransaction '' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab)
2023-02-16 13:47:24,199 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ActiveStack - Activating 'Kafka record from topicA' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab) on thread 70
2023-02-16 13:47:24,199 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.transaction.AbstractSpan - increment references to 'Kafka record from topicA' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab) (2)
2023-02-16 13:47:24,241 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ActiveStack - Deactivating 'Kafka record from topicA' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab) on thread 70
2023-02-16 13:47:24,243 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.transaction.AbstractSpan - decrement references to 'Kafka record from topicA' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab) (1)
2023-02-16 13:47:24,247 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ElasticApmTracer - endTransaction 'Kafka record from topicA' 00-6d9521f67755a0dbf26dff65a60d6e41-72fec63b03a0aae8-01 (281148ab)
2023-02-16 13:47:24,249 [elastic-apm-server-reporter] DEBUG co.elastic.apm.agent.report.IntakeV2ReportingEventHandler - Receiving TRANSACTION event (sequence 525)
2023-02-16 13:47:24,255 [elastic-apm-server-reporter] DEBUG co.elastic.apm.agent.util.UrlConnectionUtils - Opening https://elasticserver/intake/v2/events without proxy
2023-02-16 13:47:24,257 [elastic-apm-server-reporter] DEBUG co.elastic.apm.agent.report.AbstractIntakeApiHandler - Starting new request to https://elasticserver/intake/v2/events
2023-02-16 13:47:24,272 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] WARN  co.elastic.apm.agent.impl.transaction.TraceContext - Wrong parent-id field identifier: 53
2023-02-16 13:47:24,273 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.transaction.AbstractSpan - increment references to '' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83) (1)
2023-02-16 13:47:24,274 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ElasticApmTracer - startTransaction '' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83)
2023-02-16 13:47:24,274 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ActiveStack - Activating 'Kafka record from topicA' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83) on thread 70
2023-02-16 13:47:24,274 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.transaction.AbstractSpan - increment references to 'Kafka record from topicA' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83) (2)
2023-02-16 13:47:24,276 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ActiveStack - Deactivating 'Kafka record from topicA' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83) on thread 70
2023-02-16 13:47:24,277 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.transaction.AbstractSpan - decrement references to 'Kafka record from topicA' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83) (1)
2023-02-16 13:47:24,277 [Camel (camel-1) thread #6 - KafkaConsumer[topicA]] DEBUG co.elastic.apm.agent.impl.ElasticApmTracer - endTransaction 'Kafka record from topicA' 00-2717681907777a96cbdf4059d0eb76a3-ab0c115485190184-01 (87a0c83)

The kafka producer is instrumented, but it only produces a span if already in a transaction, which this isn't, because as you noted the consumer is a complete transaction. If you had a method like

forward(){
message = getkafka(topicA/serverA);
send(message, kafka(topicB/serverB))

then you could set forward() to be instrumented and you'd get what you want. But as described, there is no way for the agent to know that the consumer and producer are connected.

You have access to the IDs through the API

This topic was automatically closed 20 days after the last reply. New replies are no longer allowed.