And here is the config of connection itself
import com.application.infrastructure.events.activemq.ActiveMqEventMessageConverter;
import com.application.infrastructure.events.serializer.JsonEventSerializer;
import javax.inject.Named;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
@Configuration
public class ActiveMqJmsConfig {
public static final String ACTIVE_MQ_CONTAINER_FACTORY = "ActiveMqContainerFactory";
public static final String ACTIVE_MQ_MESSAGE_CONVERTER = "ActiveMqMessageConverter";
public static final String ACTIVE_MQ_TEMPLATE = "ActiveMqTemplate";
private final String url;
private final String username;
private final String password;
public ActiveMqJmsConfig(@Value("${spring.activemq.broker-url}") String url, @Value("${spring.activemq.user}") String username, @Value("${spring.activemq.password}") String password) {
this.url = url;
this.username = username;
this.password = password;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMqConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(this.url);
connectionFactory.setUserName(this.username);
connectionFactory.setPassword(this.password);
connectionFactory.setRedeliveryPolicy(this.redeliveryPolicy());
connectionFactory.setNonBlockingRedelivery(true);
return connectionFactory;
}
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(this.activeMqConnectionFactory());
}
@Bean(
name = {"ActiveMqMessageConverter"}
)
public MessageConverter messageConverter(JsonEventSerializer eventSerializer) {
return new ActiveMqEventMessageConverter(eventSerializer);
}
@Bean(
name = {"ActiveMqTemplate"}
)
public JmsTemplate jmsTemplate(@Named("ActiveMqMessageConverter") MessageConverter messageConverter) {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(this.connectionFactory());
template.setMessageConverter(messageConverter);
return template;
}
@Bean(
name = {"ActiveMqContainerFactory"}
)
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Named("ActiveMqMessageConverter") MessageConverter messageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.connectionFactory());
factory.setConcurrency("2-8");
factory.setMessageConverter(messageConverter);
factory.setErrorHandler((throwable) -> {
});
factory.setSessionAcknowledgeMode(2);
return factory;
}
}