Elastic APM not working out of the box when custom ConcurrentKafkaListenerContainerFactory

Kibana version:
v 7.12.1
Elasticsearch version:
7.12.1
APM Server version:
7.12.1
APM Agent language and version:
"co.elastic.apm:apm-agent-attach:1.25.0"
Original install method (e.g. download page, yum, deb, from source, etc.) and version:

@Setter
@Configuration
@ConfigurationProperties(prefix = "app.elastic-apm")
public class ElasticApmConfiguration {

    private static final String SERVER_URL_KEY = "server_url";
    private String serverUrl;

    private static final String SERVICE_NAME_KEY = "service_name";
    private String serviceName;

    private static final String ENVIRONMENT_KEY = "environment";
    private String environment;

    private static final String APPLICATION_PACKAGES_KEY = "application_packages";
    private String applicationPackages;

    @PostConstruct
    public void init() {
        Map<String, String> apmProps = new HashMap<>();
        apmProps.put(SERVER_URL_KEY, serverUrl);
        apmProps.put(SERVICE_NAME_KEY, serviceName);
        apmProps.put(ENVIRONMENT_KEY, environment);
        apmProps.put(APPLICATION_PACKAGES_KEY, applicationPackages);

        ElasticApmAttacher.attach(apmProps);
    }
}

Description of the problem including expected versus actual behavior. Please include screenshots (if relevant):

Hi, I have a simple spring boot + kafka + apm app. I have a @KafkaListener like this:

@Slf4j
@EnableKafka
@Component
@RequiredArgsConstructor
public class CampaignCleanedListener {

    ...deps...

    @KafkaListener(topics = "${app.kafka.topics.mytopic}")
    public void consume(@Payload String input, Acknowledgment acknowledgment) {
       ....my logic....
        acknowledgment.acknowledge();
    }
}

Everything works smoothly I see in my kibana in the APM graphs that average time to process a message is about 500 ms.
However when I add my custom ConcurrentKafkaListenerContainerFactory like that:

@Configuration
public class MyConfiguration {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        KafkaAutoConfiguration kafkaAutoConfiguration,
        ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, (ConsumerFactory<Object, Object>) kafkaAutoConfiguration.kafkaConsumerFactory(customizers));

        factory.setBatchListener(true);
        factory.setBatchErrorHandler(new RetryingBatchErrorHandler(new ExponentialBackOff(), null));

        return factory;
    }
}

suddenly it stops working properly and shows some false values like 3 μs for a message. I am not even sure what it measures. I would like to ask whether it some bug or it works as expected and if so how to make the apm working with custom ConcurrentKafkaListenerContainerFactory
Steps to reproduce:

  1. Create spring boot app with default @KafkaListener
  2. Add custom bean for ConcurrentKafkaListenerContainerFactory
  3. Observe that in the apm graph average times are 100 times less than they should be

Hiya, might take to next week to get back to you due to current workforce/workload. Kafka comms are async, I suspect we no longer correctly identify an ongoing transaction with the custom consumer when you install the factory. You may be able to trace the methods your factory produces and get the correct timings

As far as I know there is no special spring-kafka support but only the „normal“ Kafka Instrumentation. Currently, the transaction is started, when poll is called on the Consumer and ended when the next poll is called. I assume poll is called multiple times in a row, prematurely closing any previous started transaction, when you use ConcurrentKafkaListenerContainerFactory. You can try to set the (internal) property end_messaging_transaction_on_poll to false.

I changed my ElasticApmConfiguration to

@Setter
@Configuration
@ConfigurationProperties(prefix = "app.elastic-apm")
public class ElasticApmConfiguration {

    private static final String SERVER_URL_KEY = "server_url";
    private String serverUrl;

    private static final String SERVICE_NAME_KEY = "service_name";
    private String serviceName;

    private static final String ENVIRONMENT_KEY = "environment";
    private String environment;

    private static final String APPLICATION_PACKAGES_KEY = "application_packages";
    private String applicationPackages;

    @PostConstruct
    public void init() {
        Map<String, String> apmProps = new HashMap<>();
        apmProps.put(SERVER_URL_KEY, serverUrl);
        apmProps.put(SERVICE_NAME_KEY, serviceName);
        apmProps.put(ENVIRONMENT_KEY, environment);
        apmProps.put(APPLICATION_PACKAGES_KEY, applicationPackages);
        apmProps.put("end_messaging_transaction_on_poll", "false");
        ElasticApmAttacher.attach(apmProps);
    }
}

but it didn't help and the values were still absolutely inaccurate. I know so far that it's caused when I use the factory.setBatchListener(true) in my ConcurrentKafkaListenerContainerFactory bean. If I comment out this line everything works smoothly

Just to confirm, when you have DEBUG on for the agent, you see transactions listed for the messages, and the only difference is the timing when you have batching on and off?

If so, this is likely some kind of Spring concurrent execution handling we don't instrument. Are you able to look into this yourself, eg our current concurrency plugin has these apm-agent-java/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent at 2e40422635e573bf83da84c67a325d2bf629a91a · elastic/apm-agent-java · GitHub

Alternatively, would you be able to create a standalone example that shows the issue?

This topic was automatically closed 20 days after the last reply. New replies are no longer allowed.