我创建了一个基于spring,jms和activemq的简单的生产者消费者模拟,我试图从双方,生产者和消费者那里获得高性能,
连接设置:
<tx:annotation-driven />
<bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="failover:(tcp://${broker.url}:61616)" />
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
</bean>
<amq:queue id="queue" physicalName="queue" />
<beans:bean id="jsonMessageConverter" class="XXXXX.converter.JsonMessageConverter" />
Run Code Online (Sandbox Code Playgroud)
消费者设置:
<jms:listener-container concurrency="10"
acknowledge="auto" prefetch="1" message-converter="jsonMessageConverter" transaction-manager="transactionManager"
>
<jms:listener id="queueListener_1" destination="ooIntegrationQueue"
ref="myMessageListenerAdapter" />
</jms:listener-container>
<beans:bean id="myMessageListenerAdapter"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter" >
<beans:property name="delegate" ref="consumer"/>
</beans:bean>
<beans:bean id="consumer" class="XXX.ConsumerImpl"/>
Run Code Online (Sandbox Code Playgroud)
制片人设置:
<beans:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory" p:messageConverter-ref="jsonMessageConverter"
p:defaultDestination-ref="ooIntegrationQueue" p:sessionTransacted="true" />
Run Code Online (Sandbox Code Playgroud)
从消费者开始,我设法每秒消耗大约25条消息,这是非常慢的,我发现瓶颈是我正在使用交易,谷歌搜索后,玩配置,我发现自动装配DefaultMessageListenerContainer并将cachelevel更改为
listenerContainer.setCacheLevelName("CACHE_SESSION")
Run Code Online (Sandbox Code Playgroud)
我的性能增加到每秒约1500条消息,同时仍然有交易.
我的问题是现在生产者仍然停留在每秒约25次操作,我的生产者测试很简单:
int numOfMessages = getNumberOfMessages();
double startTime = System.currentTimeMillis();
for (int …
Run Code Online (Sandbox Code Playgroud) 当我使用Spring收听JMS消息时,我收到了上述错误.
我想知道如何将Errorhandler添加到JMS侦听器中?
在我的spring启动应用程序中,我配置了两个不同的MQQueueConnectionFactory实例(不同的id),因为它是应用程序的需要.为此,我添加了ibm客户端罐子.
我还在我的代码中添加了spring-jms依赖项,因为我想要JmsTemplate等类.添加此依赖项后,JmsAutoConfiguration在类路径中找到JmsTemplate并尝试配置bean.在这个过程中,它尝试注入ConnectionFactory类型的bean,这是代码失败的地方,我开始得到错误.以下是JmsAutoConfiguration的代码
@Configuration
@ConditionalOnClass(JmsTemplate.class)
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {
@Autowired
private JmsProperties properties;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired(required = false)
private DestinationResolver destinationResolver;
Run Code Online (Sandbox Code Playgroud)
我是否有机会随时关闭弹簧靴的JmsAutoconfiguration功能?如果没有,那么替代解决方案是什么?
如何在队列中的ActiveMQ中设置redeliveryPolicy?
1)在doc中,请参阅:activeMQ Redelivery,解释您应该在ConnectionFactory或Connection上设置它.但我想为不同的Queue使用不同的值.
2)除此之外,我似乎没有得到它的工作.在Spring中的连接工厂上设置它(我使用的是带有Spring 3.0的activemq 5.4.2),这似乎没有任何影响:
<amq:connectionFactory id="amqConnectionFactory" brokerURL="${jms.factory.url}" >
<amq:properties>
<amq:redeliveryPolicy maximumRedeliveries="6" initialRedeliveryDelay="15000" useExponentialBackOff="true" backOffMultiplier="5"/>
</amq:properties>
</amq:connectionFactory>
Run Code Online (Sandbox Code Playgroud)
我还尝试将其设置为已定义的队列上的属性,但似乎也会被忽略,因为重新传递比定义的值更早发生:
<amq:queue id="jmsQueueDeclarationSnd" physicalName="${jms.queue.declaration.snd}" >
<amq:properties>
<amq:redeliveryPolicy maximumRedeliveries="6" initialRedeliveryDelay="15000" useExponentialBackOff="true" backOffMultiplier="5"/>
</amq:properties>
</amq:queue>
Run Code Online (Sandbox Code Playgroud)
谢谢
我正在尝试基于@JmsListener注释为发布 - 订阅创建示例:https://github.com/lkrnac/book-eiws-code-samples/tree/master/05-jms/0515-publish-subscribe
相关代码段:
@Slf4j
@SpringBootApplication
@EnableScheduling
public class JmsPublishSubscribeApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(JmsPublishSubscribeApplication.class, args);
}
@Bean
public ActiveMQTopic simpleTopic() {
return new ActiveMQTopic("simpleTopic");
}
}
@Component
public class SimpleMessageListener1 {
@JmsListener(destination = "simpleTopic")
public void readMessage(String message) {
//....
}
}
@Component
public class SimpleMessageListener2 {
@JmsListener(destination = "simpleTopic")
public void readMessage(String message) {
//....
}
}
Run Code Online (Sandbox Code Playgroud)
问题是得到这种行为:
2015-05-17 20:07:04.985 INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender : Sending message: simple message …
Run Code Online (Sandbox Code Playgroud) 我想将Spring 4.1中提供的新注释和功能用于需要JMS侦听器的应用程序.
我仔细阅读了Spring 4.1 JMS改进帖子中的注释,但我仍然错过了我和设置应用程序之间的关系@JmsListener
,DestinationResolver
以及如何指示正确的Destination
或Endpoint
.
以下是@JmsListener的建议用法
@Component
public class MyService {
@JmsListener(containerFactory = "myContainerFactory", destination = "myQueue")
public void processOrder(String data) { ... }
}
Run Code Online (Sandbox Code Playgroud)
现在,我不能在我的实际代码中使用它,因为需要使用配置文件读取"myQueue" Environment.getProperty()
.
我可以设置一个合适的myContainerFactory DestinationResolver
但主要是,DynamicDestinationResolver
如果你不需要JNDI来查找app服务器中的队列而不需要做一些自定义的回复逻辑,你似乎只会使用它.我只是想了解Spring如何使用@JmsListener
注释以参数化方式指示队列的名称.
在博客文章的下方,我找到了对此Configurer的引用:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setDefaultContainerFactory(defaultContainerFactory());
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
Run Code Online (Sandbox Code Playgroud)
现在,这使得感一定量的,我可以看到,这将让我设定在一些外部串运行一个目标,但是这似乎是在冲突中使用@JmsListener
,因为它似乎是压倒一切赞成注释endpoint.setMessageListener
中上面的代码. …
这是一个非常普遍的问题,我发现很多解决方案在网络上对我不起作用.我宣布:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.5.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
和码头插件
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
</plugin>
Run Code Online (Sandbox Code Playgroud)
但是当我发出"mvn jetty:run"时,我继续得到以下异常:
org.springframework.beans.FatalBeanException:未找到命名空间[http://activemq.apache.org/schema/core]的NamespaceHandler类[org.apache.xbean.spring.context.v2.XBeanNamespaceHandler]; 嵌套异常是java.lang.ClassNotFoundException:org.apache.xbean.spring.context.v2.XBeanNamespaceHandler
运行Jetty,ActiveMQ和Spring JMS的正确maven依赖是什么?
我正在阅读关于DefaultMessageListenerContainer的 spring文档
它说:"注意:不要将Spring的CachingConnectionFactory与动态扩展结合使用.理想情况下,根本不要将它与消息监听器容器一起使用,因为通常最好让监听器容器本身在其生命周期内处理适当的缓存.此外,停止和重新启动侦听器容器只能使用独立的本地缓存连接 - 而不能使用外部缓存连接."
谁有人解释原因?
我需要将消息从一个ActiveMQ实例上的队列移动到另一个ActiveMQ实例.有没有办法使用spring boot配置连接到两个不同的ActiveMQ实例?
我需要创建多个connectionFactories吗?如果是这样,那么JmsTemplate如何知道连接哪个ActiveMQ实例?
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(JMS_BROKER_URL);
}
Run Code Online (Sandbox Code Playgroud)
任何帮助和代码示例都很有用.
提前致谢.GM
我有一个简单的 Spring Boot 服务,它使用 JMSTemplate 侦听 AWS SQS 队列。当消息得到正确处理时,一切都按预期工作。
我正在使用 CLIENT_ACKNOWLEDGE,因此在处理过程中抛出异常时,会再次收到消息。但是,SQS 队列上的默认可见性超时设置将被忽略,并且会立即再次接收消息。
SQS 队列配置了 30 秒的默认可见性超时,并且在将消息放入 DLQ 之前接收的重新驱动策略为 20。
我已禁用该服务并使用 SQS 控制台来验证是否正确设置了默认可见性超时。我还尝试将 JMS 消息添加到方法签名并执行手动验证。
这是 JMS 配置的代码:
@Configuration
@EnableJms
class JmsConfig
{
@Bean
@Conditional(AWSEnvironmentCondition.class)
public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
{
return new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.withRegion(Regions.fromName(awsRegion))
.withCredentials(new DefaultAWSCredentialsProviderChain())
);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setErrorHandler(defaultErrorHandler());
return factory;
}
@Bean
public ErrorHandler defaultErrorHandler()
{
return new ErrorHandler()
{ …
Run Code Online (Sandbox Code Playgroud) spring-jms ×10
spring ×7
java ×3
jms ×2
spring-boot ×2
amazon-sqs ×1
jetty ×1
jmstemplate ×1
maven ×1