Hi there,
I currently have a micro-services architecture composed of Spring-boot micro-services and communicating synchronously via Rest and asynchronously via Kafka messages.
While the synchronous communication is fully instrumented out of the box by the Elastic APM Java agent, Kafka on the other side isn't instrumented yet, so I'm trying to implement myself using the Public API.
The producer injects trace headers into the kafka message using a ProducerInterceptor
Kafka Configuration
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettings.getBootstrapAddresses());
configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
return new DefaultKafkaProducerFactory<>(configProps);
This interceptor passes the Trace headers into the Kafka message headers.
public class TracingProducerInterceptor implements ProducerInterceptor<String, String> {
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
ElasticApm.currentTransaction().injectTraceHeaders((key, value) ->
record.headers().add(key, value.getBytes())
return record;
The consumer consumes messages and start a new transaction with remote parent (startTransactionWithRemoteParent
) using a RecordInterceptor
and a AOP aspect.
Kafka Configuration
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory(KafkaSettings kafkaSettings) {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setRecordInterceptor(new TracingRecordInterceptor());
return factory;
One of many Kafka listeners
@KafkaListener(topics = "#{kafkaSettings.commandsTopic}")
public void consume(Message msg) {
messageProcessor.process(msg.getDetails(), msg.getRetries());
This interceptor is called just before executing the @KafkaListener
public class TracingRecordInterceptor implements RecordInterceptor<String, Message> {
public ConsumerRecord<String, Message> intercept(ConsumerRecord<String, Message> record) {
Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
if(key.startsWith("elastic-apm") && record.headers().lastHeader(key) != null) {
return new String(record.headers().lastHeader(key).value());
return null;
transaction.addLabel("topic", record.topic());
transaction.addLabel("type", record.value().type);
transaction.addLabel("id", record.value().id);
return record;
The following aspect is used to surround the execution of the message and capture the end or an exception during the transaction.
public class TracingKafkaListenerAspect {
@Around(value = "@annotation(org.springframework.kafka.annotation.KafkaListener)")
public void surroundTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
Transaction transaction = ElasticApm.currentTransaction();
@AfterThrowing(pointcut="@annotation(org.springframework.kafka.annotation.KafkaListener)", throwing="ex")
public void exceptionTransaction(Exception ex) {
Transaction transaction = ElasticApm.currentTransaction();
While this is working for the first message consumed, it doesn't work for the following because a KafkaListenerEndpointContainer
is mono-threaded and I get the following warning from the Java-agent logs:
2019-08-05 12:54:39.132
co.elastic.apm.agent.impl.transaction.AbstractSpan -
End has already been called: 'kafka#SUBMIT_ARTICLE' 00-c50eb775b8979575b469a17a6e1c78f4-6fe4a442db00fb5b-01 (72a2ee6b)
It seems like we cannot start a new transaction within the same thread.
Is it a limitation or am I missing something here?
Thank you
Kibana version: 7.3.0
Elasticsearch version: 7.3.0
APM Server version: 7.3.0
APM Agent language and version: java 1.8.0
Browser version: n/a
Original install method (e.g. download page, yum, deb, from source, etc.) and version: Kubernetes environment (Docker image)
Fresh install or upgraded from other version? Fresh install
Is there anything special in your setup? No
Other components: Spring-boot:2.1.0.RELEASE, spirng-kafka:2.2.7-RELEASE