Kibana version: 7.9.3
Elasticsearch version: 7.9.3
APM Server version: 7.9.3
APM Agent language and version: Java 1.22
Browser version: -
Original install method: Cloud ELK
Description of the problem including expected versus actual behavior:
Hi!
We try to get insights into producing and consuming messages via RMQ and therefore updated to Agent version 1.22 but are missing any output related to that. REST requests that result in producing a message are monitored and shown but we are not able to see the call to the producer nor the consumer altough messages are consumed and we see the result (updates in DB).
We use a customized SimpleRabbitListenerContainerFactory maybe it is related to that and someone could give us a hint how to get it running.
Thanks in advance
Rainer
Rabbit MQ configuration:
@EnableRabbit
@Configuration
public class RabbitMqConfig {
...
@Bean
public RetryOperationsInterceptor listenerRetryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(3)
.recoverer(new ImmediateRequeueMessageRecoverer())
.build();
}
@Bean
public SimpleRabbitListenerContainerFactory listenerRetryContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor listenerRetryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { listenerRetryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
}
Listener:
@Component
public class SubOrderStatusUpdateEventListener {
....
@RabbitListener(queues = "${mq.listener.suborder-status-update.queue}", containerFactory = "listenerRetryContainerFactory")
public void consumeBlocking(Message incomingMessage) throws Exception {
Stopwatch sw = Stopwatch.createStarted();
EventListenerResult serviceResult = this.eventListenerService.updateSubOrderStatus(incomingMessage);
if (serviceResult.isTemporaryError()) {
throw new Exception("temporary error, retry later");
} else if (serviceResult.isPermanentError()) {
putIntoDeadLetterQueue(incomingMessage);
}
sw.stop();
StructuredLogger.logInfoMessage(LogMessage.PROCESSING_TIME_FOR_MESSAGE, Map.of("TIME_MILLIS", Long.toString(sw.elapsed(TimeUnit.MILLISECONDS)), "MESSAGE_ID", retrieveMessageId(incomingMessage).orElse("noMessageId")));
}
}