Krz*_*slo 7 spring transactions jms jmstemplate
我试图使用Spring JMSTemplate.receive(String)方法以同步模式从队列中获取所有消息.
问题是我总是只收到一条消息.这是代码:
@Transactional
public List<Message> receiveAllFromQueue(String destination) {
List<Message> messages = new ArrayList<Message>();
Message message;
while ((message = queueJmsTemplate.receive(destination)) != null) {
messages.add(message);
}
return messages;
}
Run Code Online (Sandbox Code Playgroud)
如果我删除了@Transactional注释,我会收到所有消息,但所有消息都是在事务中完成的,所以如果稍后在处理这些消息时会有一个例外,消息将会丢失.
这是我的JMSTemplate bean的定义.
<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="pubSubDomain" value="false" />
<property name="receiveTimeout" value="1" />
<property name="sessionTransacted" value="true" />
</bean>
Run Code Online (Sandbox Code Playgroud)
我想要实现的是拥有一个事务,并且在此事务中我想获取所有待处理的消息.
小智 7
JmsTemplate 的接收方法每次都会创建一个新的 MessageConsumer。第二次,您的事务尚未提交,Spring 将在第一次接收期间预取一些消息。那时没有消息要提取,导致您的接收呼叫为空。
Spring 中的 JmsTemplate 有一个以 SessionCallback 作为参数的 execute 方法。这允许您针对 JmsTemplate 的底层会话运行您自己的代码。仅创建一个 MessageConsumer 应该可以解决您的问题。
@Transactional
public List<Message> receiveAllFromQueue(String destination) {
return jmsTemplate.execute(session -> {
try (final MessageConsumer consumer = session.createConsumer(session.createQueue(destination))) {
List<Message> messages = new ArrayList<>();
Message message;
while ((message = consumer.receiveNoWait()) != null) {
messages.add(message);
}
return messages;
}
}, true);
}
Run Code Online (Sandbox Code Playgroud)
我会回复自己。看来JMSTemplate不支持它。临时解决此问题的唯一方法是扩展JMSTemplate并添加使用JMSTemplate的某些部分的新方法。不幸的是,某些方法是私有的,因此需要将其复制...
public class CustomQueueJmsTemplate extends JmsTemplateDelegate {
public List<Message> receiveAll(String destinationName) {
return receiveAll(destinationName, null);
}
public List<Message> receiveAll(final String destinationName, final String messageSelector) {
return execute(new SessionCallback<List<Message>>() {
@Override
public List<Message> doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
return doReceiveAll(session, destination, messageSelector);
}
}, true);
}
private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector)
throws JMSException
{
return doReceiveAll(session, createConsumer(session, destination, messageSelector));
}
private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException {
try {
// Use transaction timeout (if available).
long timeout = getReceiveTimeout();
JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
.getResource(getConnectionFactory());
if (resourceHolder != null && resourceHolder.hasTimeout()) {
timeout = resourceHolder.getTimeToLiveInMillis();
}
// START OF MODIFIED CODE
List<Message> messages = new ArrayList<>();
Message message;
while ((message = doReceive(consumer, timeout)) != null) {
messages.add(message);
}
// END OF MODIFIED CODE
if (session.getTransacted()) {
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
} else if (isClientAcknowledge(session)) {
// Manually acknowledge message, if any.
for (Message retrievedMessages : messages) {
retrievedMessages.acknowledge();
}
}
return messages;
}
finally {
JmsUtils.closeMessageConsumer(consumer);
}
}
private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
return consumer.receiveNoWait();
} else if (timeout > 0) {
return consumer.receive(timeout);
} else {
return consumer.receive();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6868 次 |
| 最近记录: |