在SpringBoot中@JmsListener如何以及何时被调用?

kew*_*lwz 4 spring-jms spring-boot ibm-mq

我是 SpringBoot 的新手。尝试构建一个简单的非 Web 流程,在其中侦听 MQ 队列并处理收到的消息。我尝试了各种方法来在 SB 中实现此目的,但不幸的是我无法调用 @JmsListener 方法。也没有错误,过程只是等待。

所有 MQ 队列详细信息都在 application.properties 中

我确实验证了队列中有消息,并且可以使用旧的 MQ 接收器方式检索它们。

我想知道 @JmsListener Annotation 方法如何以及何时被调用?我确实尝试创建一个 JmsListenerContainerFactory 并将其包含在注释参数中,但没有什么区别。

类似的例子很少,看起来很简单,但我就是无法让它工作。任何建议表示赞赏。谢谢。

SpringBoot主类

@SpringBootApplication
@EnableJms
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
Run Code Online (Sandbox Code Playgroud)

MQListener 类

@Component
public class MQListener {
    @JmsListener(destination = "${mq.queueName}")
    public void receiveMessage(final Message message) throws JMSException{
        System.out.println("...Message Received...");
        String messageData = null;
        if(message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage)message;
            messageData = textMessage.getText();
        }
    }
}

Run Code Online (Sandbox Code Playgroud)

MQ配置类

@Configuration
public class MQConfiguration {
    @Value("${mq.host}")
    private String host;
    @Value("${mq.port}")
    private Integer port;
    @Value("${mq.queue-manager}")
    private String queueManager;
    @Value("${mq.channel}")
    private String channel;
    @Value("{mq.queueName}")
    private String queueName;
    @Value("${mq.receive-timeout}")
    private long receiveTimeout;
    
    @Bean
    public MQQueueConnectionFactory mqQueueConnectionFactory() {
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(host);
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setChannel(channel);
            mqQueueConnectionFactory.setPort(port);
            mqQueueConnectionFactory.setQueueManager(queueManager);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
}
Run Code Online (Sandbox Code Playgroud)

chu*_*hts 10

由于您使用的是 MQ 连接工厂的默认设置,因此您实际上并不需要它。相反,您可以使用 Spring Boot 为您创建的默认值。您也只需要一条短信,这样您就可以让 Spring 进行编组。在这种情况下,您所需要的只是从此示例派生的消息使用者 - https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/java/com/ibm /mq/样本/jms/spring/level101

package ...

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer101 {
    protected final Log logger = LogFactory.getLog(getClass());

    @JmsListener(destination = "${mq.queueName}")
    public void receive(String message) {
        logger.info("");
        logger.info( this.getClass().getSimpleName());
        logger.info("Received message is: " + message);
    }
}
Run Code Online (Sandbox Code Playgroud)

application.properties由于您将让 Spring Boot 创建 MQ 容器,因此您需要在表单中提供设置-


# MQ Connection settings
ibm.mq.queueManager=QM1  
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.connName=localhost(1414)


# Change the following lines as necessary. Set the ibm.mq.user
# property to an empty string to send no authentication request.
ibm.mq.user=app
ibm.mq.password=passw0rd
Run Code Online (Sandbox Code Playgroud)

您更有可能需要自定义侦听器而不是自定义连接工厂,但如果您确实想要配置与默认值不同的 ConnectionFactory,则使用从此示例派生的配置和消息使用者 - https://github.com/ibm -messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/java/com/ibm/mq/samples/jms/spring/level114

配置时,只需设置偏离默认的属性即可。


# MQ Connection settings
ibm.mq.queueManager=QM1  
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.connName=localhost(1414)


# Change the following lines as necessary. Set the ibm.mq.user
# property to an empty string to send no authentication request.
ibm.mq.user=app
ibm.mq.password=passw0rd
Run Code Online (Sandbox Code Playgroud)

注:监听器容器工厂基于自定义的连接工厂。你需要这一步。您的消息消费者看起来像这样:

package ...

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.samples.jms.spring.globals.handlers.OurDestinationResolver;
import com.ibm.mq.samples.jms.spring.globals.handlers.OurMessageConverter;
import com.ibm.mq.spring.boot.MQConfigurationProperties;
import com.ibm.mq.spring.boot.MQConnectionFactoryFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;


import javax.jms.DeliveryMode;
import javax.jms.JMSException;

@Configuration
public class MQConfiguration114 {
    protected final Log logger = LogFactory.getLog(getClass());

    @Bean
    public MQConnectionFactory mqConnectionFactory() throws JMSException {
        MQConfigurationProperties properties = new MQConfigurationProperties();
        // Properties will be a mix of defaults, and those found in application.properties
        // under ibm.mq
        // Here we can override any of the properties should we need to
        MQConnectionFactoryFactory mqcff = new MQConnectionFactoryFactory(properties,null);
        MQConnectionFactory mqcf = mqcff.createConnectionFactory(MQConnectionFactory.class);
        return mqcf;
    }

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory114() throws JMSException {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(mqConnectionFactory());
        factory.setPubSubDomain(false);

        factory.setMessageConverter(new OurMessageConverter());
        factory.setDestinationResolver(new OurDestinationResolver());

        // reply Qos
        QosSettings rQos = new QosSettings();
        rQos.setPriority(2);
        rQos.setTimeToLive(10000);
        rQos.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        factory.setReplyQosSettings(rQos);

        return factory;
    }

    @Bean("myNonJmsTemplate114")
    public JmsTemplate myNonJmsTemplate114() throws JMSException {
        JmsTemplate jmsTemplate = new JmsTemplate(mqConnectionFactory());
        jmsTemplate.setDestinationResolver(new OurDestinationResolver());
        jmsTemplate.setMessageConverter(new OurMessageConverter());

        return jmsTemplate;
    }

Run Code Online (Sandbox Code Playgroud)

如果您需要从 JMSMessage 对象执行自己的编组,则使用从此示例派生的消息使用者(您只需要一个使用者,不需要其他任何东西) - https://github.com/ibm-messaging/mq-dev-patterns/tree /master/Spring-JMS/src/main/java/com/ibm/mq/samples/jms/spring/level105

package ...

import com.ibm.mq.samples.jms.spring.globals.data.OurData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer114 {
    protected final Log logger = LogFactory.getLog(getClass());

    @JmsListener(destination = "${mq.queueName}", containerFactory = "myContainerFactory114")
    public void receiveRequest(OurData message) {
        logger.info("");
        logger.info( this.getClass().getSimpleName());
        logger.info("Received message of type: " + message.getClass().getSimpleName());
        logger.info("Received message :" + message);
    }
}
Run Code Online (Sandbox Code Playgroud)

在哪里

package ...

import javax.jms.*;

import com.ibm.mq.samples.jms.spring.globals.Constants;
import com.ibm.mq.samples.jms.spring.globals.data.OurData;
import com.ibm.mq.samples.jms.spring.globals.data.OurOtherData;
import com.ibm.mq.samples.jms.spring.globals.utils.MessageUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

import java.io.Serializable;


@Component
public class MessageConsumer105 {
    protected final Log logger = LogFactory.getLog(getClass());

    @JmsListener(destination = "${app.l105.queue.name2}")
    public void receiveData(Message message) {
        logger.info("");
        logger.info( this.getClass().getSimpleName());
        logger.info("Received message of type: " + message.getClass().getSimpleName());
        if (null != message) {
            MessageUtils.checkMessageType(message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您的消费者和配置类具有@Component@Configuration注释并且位于同一包系列中,那么它们将会被找到,如果没有,那么您需要向应用程序添加更多注释以使 Spring 拾取它们。例如。

package ...

import com.ibm.mq.samples.jms.spring.globals.Constants;
import com.ibm.mq.samples.jms.spring.globals.data.OurData;
import com.ibm.mq.samples.jms.spring.globals.data.OurOtherData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.jms.*;
import java.io.Serializable;
import java.util.Map;

public class MessageUtils {
    protected static final Log logger = LogFactory.getLog(MessageUtils.class);

    private MessageUtils () {}

    public static void checkMessageType(Message message) {
        try {
            if (message instanceof TextMessage) {
                logger.info("Message matches TextMessage");
                logger.info("message payload is " + ((TextMessage) message).getText());
            } else if (message instanceof BytesMessage) {
                logger.info("Message matches BytesMessage");
            } else if (message instanceof MapMessage) {
                logger.info("Message matches MapMessage");
            } else if (message instanceof StreamMessage) {
                logger.info("Message matches StreamMessage");
            } else if (message instanceof ObjectMessage) {
                checkForObject((ObjectMessage) message);
            }
        } catch (JMSException e) {
            logger.warn("Unable to process JMS message");
        }
    }

    public static void logHeaders(Map<String, Object> msgHeaders) {
        if (! msgHeaders.isEmpty() ) {
            logger.info("");
            logger.info("Headers found");
            msgHeaders.forEach((k, v) -> {
                logger.info(k + ": is of type" + v.getClass());
            });
        }
    }

    private static void checkForObject(ObjectMessage message) {
        try {
            int typeValue = message.getIntProperty(Constants.DATATYPE);
            if (Constants.DataTypes.OURDATATYPE.getValue() == typeValue) {
                logger.info("It is one of our objects");
                Serializable serObj = message.getObject();
                OurData data = (OurData) serObj;
                logger.info(data);
            } else if (Constants.DataTypes.OUROTHERDATATYPE.getValue() == typeValue) {
                logger.info("It is one of our other objects");
                Serializable serObj = message.getObject();
                OurOtherData data = (OurOtherData) serObj;
                logger.info(data);
            } else {
                logger.warn("It is not one of our objects");
            }
        } catch (JMSException e) {
            logger.warn("Unable to retrieve message data");
        } catch (ClassCastException e2) {
            logger.warn("Not the object we were expecting");
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

如果你没有

spring.jms.listener.auto-startup=false 
Run Code Online (Sandbox Code Playgroud)

在您的 application.properties 文件中,所有侦听器将在应用程序启动时自动启动。

您需要将名称命名mq-jms-spring-boot-starter为依赖项。例如。如果使用maven:

        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>mq-jms-spring-boot-starter</artifactId>
            <version>2.4.1</version>
        </dependency>
Run Code Online (Sandbox Code Playgroud)

如果这是唯一的 JMS 实现者,那么 Spring Boot 将能够确定您的所有侦听器都在使用 IBM MQ。如果您将其他 JMS 提供程序列为依赖项,则需要明确告知 Spring 要使用哪个连接工厂。有一个示例 pom.xml,仅包含示例中所需的依赖项,网址为https://github.com/ibm-messaging/mq-dev-patterns/blob/master/Spring-JMS/pom.xml

尝试https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS上的 101 示例 它基于 Maven,仅列出了所需的依赖项pom.xml,您需要做的就是更新https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/resourcesapplication.properties中的以指向您的 MQ 服务器。