Spring-kafka instrumentation - KafkaListenerEndpointContainer mono-thread

Hi there,

I currently have a micro-services architecture composed of Spring-boot micro-services and communicating synchronously via Rest and asynchronously via Kafka messages.

While the synchronous communication is fully instrumented out of the box by the Elastic APM Java agent, Kafka on the other side isn't instrumented yet, so I'm trying to implement myself using the Public API.


CODE DETAILS

Producer
The producer injects trace headers into the kafka message using a ProducerInterceptor

Kafka Configuration

    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaSettings.getBootstrapAddresses());
        configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }

This interceptor passes the Trace headers into the Kafka message headers.

public class TracingProducerInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        ElasticApm.currentTransaction().injectTraceHeaders((key, value) ->
            record.headers().add(key, value.getBytes())
        );
        return record;
    }
   (...)
} 

Consumer
The consumer consumes messages and start a new transaction with remote parent (startTransactionWithRemoteParent) using a RecordInterceptor and a AOP aspect.

Kafka Configuration

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory(KafkaSettings kafkaSettings) {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordInterceptor(new TracingRecordInterceptor());
        return factory;
    }

One of many Kafka listeners

    @KafkaListener(topics = "#{kafkaSettings.commandsTopic}")
    public void consume(Message msg) {
        messageProcessor.process(msg.getDetails(), msg.getRetries());
    }

This interceptor is called just before executing the @KafkaListener method.

public class TracingRecordInterceptor implements RecordInterceptor<String, Message> {

    @Override
    public ConsumerRecord<String, Message> intercept(ConsumerRecord<String, Message> record) {
        Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
            if(key.startsWith("elastic-apm") && record.headers().lastHeader(key) != null) {
                return new String(record.headers().lastHeader(key).value());
            } 
            return null;
        });
        transaction.activate();
        transaction.setType("kafka-message");
        transaction.setName("kafka#"+record.value().name);
        transaction.addLabel("topic", record.topic());
        transaction.addLabel("type", record.value().type);
        transaction.addLabel("id", record.value().id);

        return record;
    }
}

The following aspect is used to surround the execution of the message and capture the end or an exception during the transaction.

@Aspect
@Component
public class TracingKafkaListenerAspect {
    @Around(value = "@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public void surroundTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
        Transaction transaction = ElasticApm.currentTransaction();
        joinPoint.proceed();   
        transaction.end();
    }
    @AfterThrowing(pointcut="@annotation(org.springframework.kafka.annotation.KafkaListener)", throwing="ex")
    public void exceptionTransaction(Exception ex) {
        Transaction transaction = ElasticApm.currentTransaction();
        transaction.captureException(ex);
     }
}

ISSUE

While this is working for the first message consumed, it doesn't work for the following because a KafkaListenerEndpointContainer is mono-threaded and I get the following warning from the Java-agent logs:

2019-08-05 12:54:39.132 
[org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] 
WARN 
co.elastic.apm.agent.impl.transaction.AbstractSpan - 
End has already been called: 'kafka#SUBMIT_ARTICLE' 00-c50eb775b8979575b469a17a6e1c78f4-6fe4a442db00fb5b-01 (72a2ee6b)

It seems like we cannot start a new transaction within the same thread.

Is it a limitation or am I missing something here?

Thank you

Greg


INFO

Kibana version: 7.3.0
Elasticsearch version: 7.3.0
APM Server version: 7.3.0
APM Agent language and version: java 1.8.0
Browser version: n/a
Original install method (e.g. download page, yum, deb, from source, etc.) and version: Kubernetes environment (Docker image)
Fresh install or upgraded from other version? Fresh install
Is there anything special in your setup? No
Other components: Spring-boot:2.1.0.RELEASE, spirng-kafka:2.2.7-RELEASE

Hi and welcome to the APM forum :slight_smile:

Besides ending the transaction, you also need to deactivate it. Unfortunately, there is currently no API for doing that on arbitrary Transactions/Spans. For that, we have the Scope API that is returned by the activate method (ignored in your current implementation).

I will add an issue to look into adding deactivation API, but in the mean time, try to find a way to store the Scope and close it when you end the transaction.

BTW, you must ensure that transactions are always ended and activated transactions/spans are always deactivated if activated. In your TracingKafkaListenerAspect, would surroundTransaction always be invoked, even if exceptionTransaction is invoked?
Also, exceptionTransaction will only work if occurs before surroundTransaction.

ThreadLocal would probably be useful for that.

One more option- if you do everything in the listener (or MessageProcessor) at the consumer side, instead of through the interceptor, it may make everything much simpler. For example, if all your listeners extend an ElasticApmAwareKafkaListener, it can decorate the actual listeners- start and activate a transaction before calling the real consume and end and close the scope after.
Your MessageProcessor#process may also be a good candidate for that.

Hi,

Thanks for your answers an pointing me out Scope. It's working now without impacting to much the original code.

For those interested, here is the code:

Producer
The producer injects the Trace headers into the kafka message using a ProducerInterceptor

Kafka Configuration

    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaSettings.getBootstrapAddresses());
        configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }

This interceptor passes the Trace headers into the Kafka message headers.

public class TracingProducerInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        ElasticApm.currentTransaction().injectTraceHeaders((key, value) ->
            record.headers().add(key, value.getBytes())
        );
        return record;
    }
   (...)
} 

Consumer
The consumer consumes messages and start a new transaction with remote parent ( startTransactionWithRemoteParent ) using a RecordInterceptor and a AOP aspect.

Kafka Configuration

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory(KafkaSettings kafkaSettings) {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordInterceptor(new TracingRecordInterceptor());
        return factory;
    }

One of many Kafka listeners (I don't want to touch that as I have many of them in many services)

    @KafkaListener(topics = "#{kafkaSettings.commandsTopic}")
    public void consume(Message msg) {
        messageProcessor.process(msg.getDetails(), msg.getRetries());
    }

This interceptor is called just before executing the @KafkaListener method and used to capture the ConsumerRecord into a ThreadLocal

public class TracingKafkaRecordInterceptor implements RecordInterceptor<String, Message> {

    @Override
    public ConsumerRecord<String, Message> intercept(ConsumerRecord<String, Message> record) {
        TracingKafkaListenerAspect.setConsumerRecord(record);
        return record;
    }
}

Finally the following aspect is used to retrieve the ConsumerRecord (to capture some metadata) and surround the execution of the message into a transaction.

@Aspect @Component
public class TracingKafkaListenerAspect {
    
    private static ThreadLocal<ConsumerRecord<String, Message>> consumerRecords = new ThreadLocal<>();
    
    public static void setConsumerRecord(ConsumerRecord<String, Message> record) {
        consumerRecords.set(record);
    }
    
    @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public void surroundOnMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        ConsumerRecord<String, Message> record = consumerRecords.get();
            
        Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
            if(key.startsWith("elastic-apm") && record.headers().lastHeader(key) != null) {
                return new String(record.headers().lastHeader(key).value());
            } 
            return null;
        });
            
        transaction.setType("kafka-message");
        transaction.setName("kafka#"+record.value().getType());
        transaction.addLabel("topic", record.topic());
        transaction.addLabel("type", record.value().getType());
        transaction.addLabel("id", record.value().getId());
            
        try (final Scope scope = transaction.activate()) {
            joinPoint.proceed(joinPoint.getArgs()); 
        } catch (Exception e) {
            transaction.captureException(e);
        } finally {
            transaction.end();
        }
    }
}

Greg

4 Likes

Thanks a lot Greg! I am sure lots of users will find this useful!
Relying on org.springframework.kafka.annotation.KafkaListener looks specifically useful as it provides a way to trace "Kafka-consumer-transactions" properly, even though the record consumption is done through polling APIs, which are not that useful for transaction tracing.

This topic was automatically closed 20 days after the last reply. New replies are no longer allowed.