Invalid transactions grouping with ActiveMQ

Kibana version : 7.9.2

Elasticsearch version : 7.9.2

APM Server version : 7.9.2

APM Agent language and version : JAVA apm-agent-attach:1.17.0, apm-agent-api:1.17.0

Java version : openjdk:11

We have Spring-boot application with the following APM config:

service_name=app-name
server_urls=http://apm-server-ops.app.com
transaction_sample_rate=1
application_packages=com.app.package
ignore_urls:/actuator*, /swagger-ui*
enable_log_correlation=true
transaction_max_spans=1000
span_min_duration=500ms

I have struggled with this warning (originally reported here )

2020-09-17 14:15:30,649 [DefaultMessageListenerContainer-2] WARN  co.elastic.apm.agent.impl.transaction.Span - 
Max spans (500) for transaction 'JMS RECEIVE from queue Consumer.SmsServiceImpl.VirtualTopic.****CreatedEvent' 00-6efa3ce85265c5f6d3f8d53feed3d11f-acdeb259e07a8286-00 (5084b178) has been reached. 
For this transaction and possibly others, further spans will be dropped. See config param 'transaction_max_spans'.

based on recommendation i've increased transaction_max_spans & span_min_duration params. However I still see these warnings

Generally, I can say that such huge spans (more than 1000 with current config) do not fit in our use case:
The application is not very loaded and it processes 1-2 HTTP requests per 30sec for an entity creation (persists in the DB). Once the entity is created the application emits an event to ActiveMQ which is picked up by five-six ActiveMQ listeners which live in the same application. Listeners do some stupid job like: send sms, email, perform tracking. Every listeners does max 2-3 sql request + calls some http service.

After some investigation I see that quite some ActiveMQ transactions are grouped under one incorrect transaction id (trace id is also the same):

here is filter of logs by transaction.id from our kibana over 4 days:

as you can see, it finds over 600 (!) log entries from totally different unrelated ActiveMQ events, which are published upon creation of totally different entities, so should not share neither trace.id nor transaction.id.

From my understanding if I create entity A and persist it into DB and emit corresponding EntityCreatedEvent which is picked up by several ActiveMQ listeners, then only this flow should be grouped by one trace.id and should not be mixed up with flow caused by creation of entity B (they should be grouped under different trace.id). Then every event listener (Listener-1, Listener-2, etc) creates a separate transaction (unique transaction.id), so I have:
transaction.id: Http-Session-1 trace.id: event-A
transaction.id: Listener-1-event-A-1 trace.id: event-A
transaction.id: Listener-2-event-A-2 trace.id: event-A
transaction.id: Listener-3-event-A-3 trace.id: event-A


transaction.id: Http-Session-2 trace.id: event-B
transaction.id: Listener-1-event-B-1 trace.id: event-B
transaction.id: Listener-2-event-B-2 trace.id: event-B
transaction.id: Listener-3-event-B-3 trace.id: event-B

but in my Kibana (as in the screenshot) i see basically that
transaction.id: Listener-1-event-A-1 trace.id: event-A
transaction.id: Listener-1-event-B-1 trace.id: event-B
are being tracked by the same trace.id and transaction.id

And this logically causes more than 1000 spans joining one (incorrect) transaction id.
Is it a bug on apm agent or my assumptions are not correct?

Hi and thanks for the question!
Are you using a MessageListener, or your own listener that actually polls the queue using a receive API?

we are using spring-jms:

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.2.6.RELEASE</version>
    </dependency>

here is the code:

    org.springframework.jms.config.SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
    endpoint.setId("id");
    endpoint.setDestination("destination");
    endpoint.setMessageListener((message) -> { ... } );
    (org.springframework.jms.config.JmsListenerEndpointRegistrar) registrar.registerEndpoint(endpoint, this.containerFactory);

Thanks!
Please set log_level to DEBUG and provide a log from startup and until after a few messages have been sent and received.
If you can put your Spring app JMS-related configurations, that may help as well.

Okay, I have uploaded logs here:


the password is sent to you via private message

it's from our dev env - not sure that it is reproducible there due to very low load
but hope it somehow helps, if you need it from production, I can also get it from there

And here is the config of connection itself

import com.application.infrastructure.events.activemq.ActiveMqEventMessageConverter;
import com.application.infrastructure.events.serializer.JsonEventSerializer;

import javax.inject.Named;
import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;

@Configuration
public class ActiveMqJmsConfig {
public static final String ACTIVE_MQ_CONTAINER_FACTORY = "ActiveMqContainerFactory";
public static final String ACTIVE_MQ_MESSAGE_CONVERTER = "ActiveMqMessageConverter";
public static final String ACTIVE_MQ_TEMPLATE = "ActiveMqTemplate";
private final String url;
private final String username;
private final String password;

public ActiveMqJmsConfig(@Value("${spring.activemq.broker-url}") String url, @Value("${spring.activemq.user}") String username, @Value("${spring.activemq.password}") String password) {
    this.url = url;
    this.username = username;
    this.password = password;
}

@Bean
public RedeliveryPolicy redeliveryPolicy() {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(5);
    return redeliveryPolicy;
}

@Bean
public ActiveMQConnectionFactory activeMqConnectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(this.url);
    connectionFactory.setUserName(this.username);
    connectionFactory.setPassword(this.password);
    connectionFactory.setRedeliveryPolicy(this.redeliveryPolicy());
    connectionFactory.setNonBlockingRedelivery(true);
    return connectionFactory;
}

@Bean
public ConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(this.activeMqConnectionFactory());
}

@Bean(
        name = {"ActiveMqMessageConverter"}
)
public MessageConverter messageConverter(JsonEventSerializer eventSerializer) {
    return new ActiveMqEventMessageConverter(eventSerializer);
}

@Bean(
        name = {"ActiveMqTemplate"}
)
public JmsTemplate jmsTemplate(@Named("ActiveMqMessageConverter") MessageConverter messageConverter) {
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(this.connectionFactory());
    template.setMessageConverter(messageConverter);
    return template;
}

@Bean(
        name = {"ActiveMqContainerFactory"}
)
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Named("ActiveMqMessageConverter") MessageConverter messageConverter) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory());
    factory.setConcurrency("2-8");
    factory.setMessageConverter(messageConverter);
    factory.setErrorHandler((throwable) -> {
    });
    factory.setSessionAcknowledgeMode(2);
    return factory;
}
}

Hi @Eyal_Koren, I was digging a bit into this issue, and these are my findings so far, based on code analysis and some debug of a simple test:

  1. We use org.springframework.jms.listener.DefaultMessageListenerContainer for JMS subscriptions

  2. This container executes runs receive-loop (DefaultMessageListenerContainer.AsyncMessageListenerInvoker task) via DefaultMessageListenerContainer#doRescheduleTask, which uses java Executor

  3. doRescheduleTask just executes this.taskExecutor.execute((Runnable) task);
    So the receive-loop is running inside the Executor

  4. When any message is received, container executes the following code:

	protected void messageReceived(Object invoker, Session session) {
		((AsyncMessageListenerInvoker) invoker).setIdle(false);
		scheduleNewInvokerIfAppropriate();
	}

Please note, that this code is executed as a part of receive-loop, i.e. inside the executor thread

  1. scheduleNewInvokerIfAppropriate will create a new receive-loop task, using the same approach as in step 2, i.e. using the Executor

And here comes my assumption:

  • At step 4 we still have an active transaction, created in JmsMessageConsumerInstrumentation.ReceiveInstrumentation.MessageConsumerAdvice#afterReceive (startJmsTransaction)
  • At step 5 we propagate this transaction to a newly created task (and thread), because of ExecutorInstrumentation
  • So all newly started consumers will inherit the original transaction from ExecutorInstrumentation and this transaction will be never deactivated (because it will be referenced from multiple places)

As a confirmation of this theory I tried the following changes:

  • Limit consumers count to a single thread (factory.setConcurrency("1"))
  • Disable instrumentation of the default spring executor (-Delastic.apm.classes_excluded_from_instrumentation=org.springframework.core.task.SimpleAsyncTaskExecutor)

And then I ran a simple test with send/receive of 10000 messages. Without these changes, "startTransaction" was called ~2500 times
With any of those changes, "startTransaction" was called exactly 10001 times

@Svetlana_Nikitina and @bugy sorry, I only got the chance now to look at this :pray:
What an incredible description and analysis! :heart:

The analysis seems very interesting. I tend to agree. Refining your hypothesis:

The thing is that the next call to receive will deactivate and end the transaction on each of these threads, as long as the type is the temporary message-handling type. However, the first thread encountering such active transaction type changes the type, meaning - the original message handling thread will see a messaging type and therefore will not deactivate it and keep treating the subsequent receive invocations as spans - see here.

I tried something based on your analysis, I will link the snapshot for test once it's ready.
Thanks for the great input!!

1 Like

Please try out this snapshot.

We will try it out soon and give our feedback
Thank you a lot for looking into this

I tried a different approach that I think should be superior. If you can test this one as well, it will be great.
Thanks!

Version 1.19.0 was released yesterday with a proposed fix for your issue.
You might as well try it out instead of the snapshots.

I've just released our code with the new version
the issue seems to be resolved
thank you!

1 Like

This topic was automatically closed 20 days after the last reply. New replies are no longer allowed.