将RedeliveryPolicy注入Spring引导ActiveMQ

Nik*_*ita 3 configuration spring activemq-classic jms spring-boot

我使用的是spring-boot-starter-activemqPoolFactory.一切都很好,并且很容易通过配置application.yaml,但我找不到正确的方式来注入我的自定义RedeliveryPolicy.所以问题是我该怎么做?

小智 6

我已经尝试过@Iulian Rosca 的答案,但它不适用于最新版本的 spring-boot-starter-activemq(我使用的是 2.2.0.RELEASE 版本)。

Spring 为 提供了许多其他的实现类ConnectionFactory,例如CachingConnectionFactory, JmsPooledConnectionFactory,... 并且它们之间不会相互继承。因此,上述解决方案需要对许多类类型进行重构。

我发现 Spring 提供了一个功能接口ActiveMQConnectionFactoryCustomizer,它会在创建 ActiveMQ 连接工厂时注入,无论哪个类。它将自定义连接工厂类,您可以设置任何属性,而不仅仅是重新传递策略。

这是我的示例代码:

@Configuration
public class RedeliveryCofiguration {

    @Bean
    public ActiveMQConnectionFactoryCustomizer configureRedeliveryPolicy() {
        return connectionFactory ->
        {
            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
            // configure redelivery policy
            connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        };
    }
}
Run Code Online (Sandbox Code Playgroud)


Iul*_*sca 5

可以在连接工厂上设置重新传递策略.由于弹簧引导自动配置连接工厂,您可以添加一个方法来设置它.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Foo
{
    @Bean
    public InitializingBean connectionFactory(ActiveMQConnectionFactory connectionFactory)
    {
        return () ->
        {
            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
            // configure redelivery policy
            connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

更新

如一个回复中所述,当未设置以下配置时,上述解决方案很有效spring.activemq.pool.enabled = true.

使用池连接工厂时,Spring Boot会自动配置a org.apache.activemq.pool.PooledConnectionFactory而不是org.apache.activemq.ActiveMQConnectionFactory.此配置在此处发生:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.PooledConnectionFactoryConfiguration#pooledJmsConnectionFactory在以下行中:

PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(new ActiveMQConnectionFactoryFactory(properties,factoryCustomizers.getIfAvailable()).createConnectionFactory(ActiveMQConnectionFactory.class));
Run Code Online (Sandbox Code Playgroud)

为了完成这项工作,您有两种选择:

  1. 使用了共同的父ActiveMQConnectionFactoryPooledConnectionFactory注入时豆,这是javax.jms.ConnectionFactory,用实例和铸造设置重新传递政策.

        @Bean
        public InitializingBean connectionFactory(ConnectionFactory connectionFactory) {
    
            if (connectionFactory instanceof ActiveMQConnectionFactory) {
                return configureRedeliveryPolicy((ActiveMQConnectionFactory) connectionFactory);
    
            } else if (connectionFactory instanceof PooledConnectionFactory) {
                final PooledConnectionFactory pooledConnectionFactory = (PooledConnectionFactory) connectionFactory;
                if (pooledConnectionFactory.getConnectionFactory() instanceof ActiveMQConnectionFactory) {
                    return configureRedeliveryPolicy((ActiveMQConnectionFactory) pooledConnectionFactory.getConnectionFactory());
                }
            }
            // ...
        }
    
    Run Code Online (Sandbox Code Playgroud)
  2. 为每种bean类型提供两种不同的自定义初始化方案,它们基本上与1相同,但是您要利用ConditionalOnBean而不是检查connectionFactory类类型.

    @Configuration
    public class Foo {
        @Bean
        @ConditionalOnBean(ActiveMQConnectionFactory.class)
        public InitializingBean connectionFactory(ActiveMQConnectionFactory connectionFactory) {
            return configureRedeliveryPolicy(connectionFactory);
        }
    
        @Bean
        @ConditionalOnBean(PooledConnectionFactory.class)
        public InitializingBean pooledConnectionFactory(PooledConnectionFactory connectionFactory) {
            if (connectionFactory.getConnectionFactory() instanceof ActiveMQConnectionFactory) {
                return configureRedeliveryPolicy((ActiveMQConnectionFactory) connectionFactory.getConnectionFactory());
            } else return () -> {
                // do something else
            };
        }
    
        private InitializingBean configureRedeliveryPolicy(ActiveMQConnectionFactory connectionFactory) {
            return () ->
            {
                RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
                // configure redelivery policy
                connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
            };
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)