Spring Batch - 并非所有记录都是从MQ检索中处理的

Nat*_*ynn 5 java spring activemq-classic spring-batch ibm-mq

我对Spring和Spring Batch相当新,所以如果你有任何问题,请随时提出任何澄清问题.

我发现Spring Batch存在一个问题,我无法在测试或本地环境中重新创建.我们有一个日常工作,通过JMS连接到Websphere MQ并检索一组记录.此作业使用开箱即用的JMS ItemReader.我们实现了自己的ItemProcessor,但它除了记录之外没有做任何特殊的事情.没有应该影响传入记录的过滤器或处理.

问题是,在MQ的每日10,000多条记录中,只有大约700个左右(每次确切的数字不同)通常会记录在ItemProcessor中.所有记录都已成功从队列中删除.记录的记录数每次都不同,似乎没有模式.通过将日志文件与MQ中的记录列表进行比较,我们可以看到一个看似随机的记录子集正在被我们的工作"处理".可能会拾取第一条记录,然后跳过50条,然后连续5条等等.每次作业运行时,模式都不同.也没有记录异常.

在localhost中运行相同的应用程序并使用相同的数据集进行测试时,ItemProcessor将成功检索并记录所有10,000多条记录.该作业在生产中运行20到40秒(也不是常数),但在测试和本地需要几分钟才能完成(这显然是有意义的,因为它处理了更多的记录).

因此,这是解决问题的难题之一,因为我们无法重新创建它.一个想法是实现我们自己的ItemReader并添加额外的日志记录,以便我们可以看到记录是否在读者之前或读者之后丢失 - 我们现在知道的是ItemProcessor只处理了一部分记录.但即使这样也无法解决我们的问题,并且考虑到它甚至不是解决方案,实施起来会有些及时.

还有其他人看过像这样的问题吗?任何可能的想法或疑难解答建议将不胜感激.以下是我们用于参考的一些jar版本号.

  • 春天 - 3.0.5.RELEASE
  • Spring Integration - 2.0.3.RELEASE
  • Spring Batch - 2.1.7.RELEASE
  • 活动MQ - 5.4.2
  • Websphere MQ - 7.0.1

提前感谢您的意见.

编辑:每个请求,处理器的代码:

public SMSReminderRow process(Message message) throws Exception {

    SMSReminderRow retVal = new SMSReminderRow();
    LOGGER.debug("Converting JMS Message to ClaimNotification");
    ClaimNotification notification = createClaimNotificationFromMessage(message);

    retVal.setShortCode(BatchCommonUtils
            .parseShortCodeFromCorpEntCode(notification.getCorpEntCode()));
    retVal.setUuid(UUID.randomUUID().toString());
    retVal.setPhoneNumber(notification.getPhoneNumber());
    retVal.setMessageType(EventCode.SMS_CLAIMS_NOTIFY.toString());

    DCRContent content = tsContentHelper.getTSContent(Calendar
            .getInstance().getTime(),
            BatchCommonConstants.TS_TAG_CLAIMS_NOTIFY,
            BatchCommonConstants.TS_TAG_SMSTEXT_TYP);

    String claimsNotificationMessage = formatMessageToSend(content.getContent(),
            notification.getCorpEntCode());

    retVal.setMessageToSend(claimsNotificationMessage);
    retVal.setDateTimeToSend(TimeUtils
            .getGMTDateTimeStringForDate(new Date()));

    LOGGER.debug(
            "Finished processing claim notification for {}. Writing row to file.",
            notification.getPhoneNumber());
    return retVal;
}
Run Code Online (Sandbox Code Playgroud)

JMS配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="claimsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="jms/SMSClaimNotificationCF" />
    <property name="lookupOnStartup" value="true" />
    <property name="cache" value="true" />
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>

<bean id="jmsDestinationResolver"
    class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>

<bean id="jmsJndiDestResolver" 
    class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>  

<bean id="claimsJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="claimsQueueConnectionFactory" />
    <property name="defaultDestinationName" value="jms/SMSClaimNotificationQueue" />
    <property name="destinationResolver" ref="jmsJndiDestResolver" />
    <property name="pubSubDomain">
        <value>false</value>
    </property>
    <property name="receiveTimeout">
        <value>20000</value>
    </property>
</bean>
Run Code Online (Sandbox Code Playgroud)

poj*_*guy 1

请参阅http://activemq.apache.org/jmstemplate-gotchas.html

使用 JMSTemplate 时存在问题。我只是在升级硬件并突然暴露出预先存在的竞争条件时才遇到这些问题。

简而言之,根据设计和意图,JMS 模板会在每次调用时打开和关闭连接。它不会看到早于其创建的消息。在大容量和/或高吞吐量场景中,它将无法读取某些消息。