在下面的测试中,我试图模拟以下场景:
但是我的测试失败了,并且消息没有重新传递给新的消费者.我会很感激任何提示.
MessageProcessingFailureAndReprocessingTest.java
@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests {
@Autowired
private FailureReprocessTestScenario testScenario;
@Before
public void setUp() {
testScenario.start();
}
@After
public void tearDown() throws Exception {
testScenario.stop();
}
@Test public void
should_reprocess_task_after_processing_failure() {
try {
Thread.sleep(20*1000);
assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
"task-1",
})));
} catch (InterruptedException e) {
fail();
}
}
@Configurable
public static class FailureReprocessTestScenario {
@Autowired
public BrokerService broker;
@Autowired
public MockTaskProducer mockTaskProducer;
@Autowired
public FailingWorker failingWorker;
@Autowired
public SucceedingWorker succeedingWorker; …
Run Code Online (Sandbox Code Playgroud) 我正在使用spring和JMS编写一个新的应用程序.Myintent将使用Spring的异步接收 - 消息驱动的POJO.我收到以下错误.
org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful[904] -
Could not refresh JMS Connection for destination
'xyz_Module!xyz_Queue' - retrying in 5000 ms. Cause:
[Security:090398]Invalid Subject: principals=[user, groupa, groupb]
java.lang.SecurityException: [Security:090398]Invalid Subject: principals=[user, groupa, groupb]
Run Code Online (Sandbox Code Playgroud)
我有以下春季定义
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="messageAdapter" />
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="jms/xxxxx" />
<property name="resourceRef" value="true"></property>
</bean>
<bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="jms/yyyyy" />
</bean>
<bean id="messageAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="messageReceiverDelegate" />
<property name="defaultListenerMethod" value="receive" />
</bean>
Run Code Online (Sandbox Code Playgroud)
我在几篇文章中读到了我们必须启用跨域安全性的信息.我不相信我们需要启用跨域安全性,因为我公司中的几个其他MDB应用程序通过从远程队列读取工作正常.这些应用程序在其weblogic服务器中未启用跨域安全性.
我花了一些时间挖掘并且发了一些财富我发现如果在部署应用程序后重新启动weblogic服务器,我就不会再看到安全性错误了.我能够收到远程队列中的消息通知. …
我正在尝试使用Spring Integration和ActiveMQ Message Broker配置JMS.我的出站通道应该由JDBC消息存储支持,以防止数据丢失,例如代理或我的应用程序脱机.
我的配置似乎到目前为止工作,但JDBC消息存储不像我期望的那样.如果我断开代理断开连接,发送到出站通道的消息将按预期保留,但在重新连接后,它们将保留在DB中并且不会发送到队列.但是,重新连接后发送的其他消息到达队列,如果我重新启动应用程序,最后也会发送持久消息...
应用程序的context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:j2ee="http://www.springframework.org/schema/jee"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- ActiveMQ Configuration -->
<jms:annotation-driven/>
<int:annotation-config/>
<!-- Factory for ActiveMQ connections from JNDI -->
<j2ee:jndi-lookup jndi-name="java:comp/env/jms/connectionFactory"
id="jmsConnectionFactory"
expected-type="org.apache.activemq.ActiveMQConnectionFactory"
proxy-interface="javax.jms.ConnectionFactory"
lookup-on-startup="false"
resource-ref="true"
cache="true"/>
<bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="maxConnections" value="8"/>
<property name="reconnectOnException" value="true"/>
</bean>
<!-- JAXB-marshaller from spring-oxm used to handle XML-payload of messages --> …
Run Code Online (Sandbox Code Playgroud) 我正在进入一个实现IBM MQ侦听Spring JMS应用程序的项目,并且在理解DefaultMessageListenerContainer中的“ receiveTimeout”时遇到了麻烦。
与Internet上的来源相比,我认为我的项目有点特殊,我们为“ receiveTimeout”参数使用了非常高的30秒值,我不知道这实际上意味着什么。
我试图弄清楚“ receiveTimeout”参数的含义,在Spring配置之后,我将在下面提供您的理解。
仅供参考:我们正在从队列中读取/处理许多非常小的消息(大约100kb)。
这是我们正在使用的spring配置:
<bean id="msgListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="mqConnectionFactory"
p:messageListener-ref="myMessageListener" p:sessionTransacted="true"
p:concurrentConsumers="1" p:maxConcurrentConsumers="20"
p:receiveTimeout="30000" p:idleTaskExecutionLimit="10"
p:idleConsumerLimit="5" />
Run Code Online (Sandbox Code Playgroud)
如果有人想知道不同的参数,这就是我在整个互联网上收集到的信息:
该idleConsumerLimit属性用于指定消费者被允许被闲置在给定时间的最大数量。增加此限制会导致更积极地创建调用程序。这对于加快消费者数量很有用。
idleTaskExecutionLimit:接收任务允许的空闲执行次数的限制。缺省值为1,导致一旦任务未收到消息,空闲资源将尽早关闭。idleTaskExecutionLimit属性设置为10,以允许任务执行10次,而不是默认值1。
receiveTimeout属性设置为30秒,以告知DMLC的接收操作轮询消息30秒,而不是默认的1秒。
这是我的理解:
因此,这意味着:如果负载很大,Spring JMS将启动多达20个使用者(maxConcurrentConsumers),并且一旦负载下降,这些使用者将继续读取消息30秒钟(receiveTimeout),然后关闭或变为空闲状态。因此,此后5个使用者(idleConsumerLimit)在关闭之前仍将闲置10秒钟(?)(idleTaskExecutionLimit)。
如果我错了,请纠正我。
一个互联网页面说我的消费者每30秒只会阅读一条消息,但我认为这不是对“ receiveTimeout”的正确解释。
当前存在的一个问题是,我们从MQ读取了许多GET,但实际上没有收到消息-就像我们可以拥有60'000 GET的一样,它们实际上能够读取消息,而发生的错误为2'100'000 GET。但没有阅读消息。
感谢您对更好地了解Spring JMS行为的帮助。
我想写下面的测试;
有一个叫听者state-info-1
在src/main
.
它对它获得的任何消息进行一些更改,并在activemq主题上发布新消息state-info-2
.
我将构建一个虚拟消息并发布到activemq主题state-info-1
.
最后验证,收到的关于主题的消息state-info-2
就像我预期的那样.
我的听众就像;
@JmsListener(destination = "state-info-1", containerFactory = "connFactory")
public void receiveMessage(Message payload) {
// Do Stuff and Publish to state-info-2
}
Run Code Online (Sandbox Code Playgroud)
我可以为此写测试吗?或者我必须以其他方式做到这一点?
另外,我看了这个:https://github.com/spring-projects/spring-boot/blob/master/spring-boot-samples/spring-boot-sample-activemq/src/test/java/sample/activemq /SampleActiveMqTests.java
但这不是我所期待的.
任何帮助或推动正确的方向就足够了.
感谢您的时间.
我将 RabbitMq(带有 JMS)与 jmsTemplate 一起使用,我能够使用来自 RabbitMq 队列的消息,但它正在自动进行确认。
我有它的搜索 API,但无法找到它。
如何设置手动确认。
在下面的代码中,当从队列中使用消息时,我想使用该消息调用 Web 服务,并取决于来自我想从队列中删除该消息的响应。我创建了一个项目,在其中使用监听器和其他项目,并调用从队列中读取消息
第一个项目:
package com.es.jms.listener;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
@Configuration
public class RabbitMqMessageListener {
@Bean
public ConnectionFactory jmsConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("Username");
connectionFactory.setPassword("Password");
connectionFactory.setVirtualHost("vhostname");
connectionFactory.setHost("hostname");
return connectionFactory;
}
@Bean
public MessageListener msgListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println(message.toString());
if (message instanceof TextMessage) {
try { …
Run Code Online (Sandbox Code Playgroud) 我希望能够从application.properties设置@JMSlistener目标
我的代码看起来像这样
@Service
public class ListenerService {
private Logger log = Logger.getLogger(ListenerService.class);
@Autowired
QueueProperties queueProperties;
public ListenerService(QueueProperties queueProperties) {
this.queueProperties = queueProperties;
}
@JmsListener(destination = queueProperties.getQueueName() )
public void listenQueue(String requestJSON) throws JMSException {
log.info("Received " + requestJSON);
}
}
Run Code Online (Sandbox Code Playgroud)
但是当我建造时
Error:(25, 60) java: element value must be a constant expression
Run Code Online (Sandbox Code Playgroud) 因此,我在春季jms 50-100使用并发,允许最多连接高达200.一切都按预期工作但如果我尝试从队列中检索100k消息,我的意思是我的sqs上有100k消息,我通过弹簧读取它们jms正常的方法.
@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
}
Run Code Online (Sandbox Code Playgroud)
我在控制台中看到了所有日志,但是在大约17k之后,它开始抛出异常
像:aws sdk异常:端口已经在使用中.
为什么我会看到这个例外,怎么做.我摆脱它?
我试着在互联网上寻找它.找不到任何东西.
我的设定:
并发50-100
为每个任务设置消息:50
客户承认
timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
Run Code Online (Sandbox Code Playgroud)
更新:我找到了问题,似乎正在创建新的套接字,直到每个套接字都耗尽.
我的春季jms版本是4.3.10
要复制此问题,只需执行以上配置,最大连接为200,货币设置为50-100,并将大约40k消息推送到sqs队列.可以使用https://github.com/adamw/elasticmq作为本地堆栈服务器复制亚马逊sqs ..完成后直到这里.注释jms监听器并使用soap ui加载测试并调用send消息来触发许多消息.仅仅因为你评论了@jmslistener注释,它就不会消耗来自队列的消息.一旦您看到已发送40k消息,请停止.取消注释@jmslistener并重新启动服务器.
更新:
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new …
Run Code Online (Sandbox Code Playgroud) 这是在创建持久订阅的背景下。DefaultMessageListenerContainer
中有一个 setClientId() , SingleConnectionFactory 中有另一个setClientId() 。
我的理解是:
因此,在 ListenerContainer 处 setClientId() 是有意义的。
但是,为什么在连接工厂级别会有 setClientId() ?
即使 SingleConnectionFactory 只有一个连接,该连接也可以由多个使用者跨多个会话共享。正确的 ?不用说,这对于 CachingConnectionFactory(从 SingleConnectionFactory 继承此方法)来说更危险。
扩展版本: 我们可以说不应该在 Single/CachingConnectionFactory 上使用 setClientId() 吗?DefaultMessageListenerContainer 的 setClientId() 中的以下语句使这一点变得更加必要:
此外,只有在原始 ConnectionFactory 尚未分配客户端 ID 的情况下,才能分配客户端 ID
因此,如果有人不小心在 CachingConnectionFactory 上设置了 ClientId,则将来在 DefaultMessageListenerContainer 上设置的客户端 id 将无法执行!
在我的应用程序中运行集成测试时,我在我的集成测试之一的故障安全测试报告中收到以下错误消息:
listener.DefaultMessageListenerContainer,WARN,Setup of JMS message listener invoker failed for destination 'jms/myapp.OneWorker' - trying to recover. Cause: Destination [jms/myapp.OneWorker] not found in JNDI; nested exception is javax.naming.NameNotFoundException: jms/myapp.OneWorker
Run Code Online (Sandbox Code Playgroud)
下面是我的配置细节:
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">
org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
<prop key="java.naming.provider.url">vm://localhost:0</prop>
<prop key="java.naming.security.principal">system</prop>
<prop key="java.naming.security.credentials">system</prop>
</props>
</property>
</bean>
<bean id="jndiQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTemplate" />
<property name="jndiName" value="jmsFactory" />
</bean>
<bean id="queueConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jndiQueueConnectionFactory" />
<property name="sessionCacheSize" value="1" />
</bean>
<bean id="destinationResolver"
class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate" />
<property name="cache" value="true" …
Run Code Online (Sandbox Code Playgroud) spring-jms ×10
java ×4
jms ×4
spring ×4
spring-boot ×4
amazon-sqs ×1
jakarta-ee ×1
jmstemplate ×1
junit ×1
rabbitmq ×1
spring-test ×1