Hi there,
I currently have a micro-services architecture composed of Spring-boot micro-services and communicating synchronously via Rest and asynchronously via Kafka messages.
While the synchronous communication is fully instrumented out of the box by the Elastic APM Java agent, Kafka on the other side isn't instrumented yet, so I'm trying to implement myself using the Public API.
CODE DETAILS
Producer
The producer injects trace headers into the kafka message using a ProducerInterceptor
Kafka Configuration
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettings.getBootstrapAddresses());
configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
return new DefaultKafkaProducerFactory<>(configProps);
}
This interceptor passes the Trace headers into the Kafka message headers.
public class TracingProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
ElasticApm.currentTransaction().injectTraceHeaders((key, value) ->
record.headers().add(key, value.getBytes())
);
return record;
}
(...)
}
Consumer
The consumer consumes messages and start a new transaction with remote parent (startTransactionWithRemoteParent
) using a RecordInterceptor
and a AOP aspect.
Kafka Configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory(KafkaSettings kafkaSettings) {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordInterceptor(new TracingRecordInterceptor());
return factory;
}
One of many Kafka listeners
@KafkaListener(topics = "#{kafkaSettings.commandsTopic}")
public void consume(Message msg) {
messageProcessor.process(msg.getDetails(), msg.getRetries());
}
This interceptor is called just before executing the @KafkaListener
method.
public class TracingRecordInterceptor implements RecordInterceptor<String, Message> {
@Override
public ConsumerRecord<String, Message> intercept(ConsumerRecord<String, Message> record) {
Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
if(key.startsWith("elastic-apm") && record.headers().lastHeader(key) != null) {
return new String(record.headers().lastHeader(key).value());
}
return null;
});
transaction.activate();
transaction.setType("kafka-message");
transaction.setName("kafka#"+record.value().name);
transaction.addLabel("topic", record.topic());
transaction.addLabel("type", record.value().type);
transaction.addLabel("id", record.value().id);
return record;
}
}
The following aspect is used to surround the execution of the message and capture the end or an exception during the transaction.
@Aspect
@Component
public class TracingKafkaListenerAspect {
@Around(value = "@annotation(org.springframework.kafka.annotation.KafkaListener)")
public void surroundTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
Transaction transaction = ElasticApm.currentTransaction();
joinPoint.proceed();
transaction.end();
}
@AfterThrowing(pointcut="@annotation(org.springframework.kafka.annotation.KafkaListener)", throwing="ex")
public void exceptionTransaction(Exception ex) {
Transaction transaction = ElasticApm.currentTransaction();
transaction.captureException(ex);
}
}
ISSUE
While this is working for the first message consumed, it doesn't work for the following because a KafkaListenerEndpointContainer
is mono-threaded and I get the following warning from the Java-agent logs:
2019-08-05 12:54:39.132
[org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1]
WARN
co.elastic.apm.agent.impl.transaction.AbstractSpan -
End has already been called: 'kafka#SUBMIT_ARTICLE' 00-c50eb775b8979575b469a17a6e1c78f4-6fe4a442db00fb5b-01 (72a2ee6b)
It seems like we cannot start a new transaction within the same thread.
Is it a limitation or am I missing something here?
Thank you
Greg
INFO
Kibana version: 7.3.0
Elasticsearch version: 7.3.0
APM Server version: 7.3.0
APM Agent language and version: java 1.8.0
Browser version: n/a
Original install method (e.g. download page, yum, deb, from source, etc.) and version: Kubernetes environment (Docker image)
Fresh install or upgraded from other version? Fresh install
Is there anything special in your setup? No
Other components: Spring-boot:2.1.0.RELEASE, spirng-kafka:2.2.7-RELEASE