JMS主题和JMS选择器是完全不同的概念,但它们都可以被消费者使用以仅获取消息的子集.
在PubSub场景中,使用以下方法过滤消息的专业人员和骗子是什么?
选项1)在单个主题中发布所有内容,使用者使用JMS选择器
选项2)订阅者订阅一个或多个主题.
例如:
我正在发布客户端数据消息,订阅者可能只想获得特定类型的客户端.客户端类型在不同的属性中定义,如'ClientGroup''ClientSource''ClientOrgUnit''ClientSize'等
我该怎么办:
1)为每个客户端类型创建一个不同的主题
或
2)在主题上并让每个类型都在JMS属性中并让订阅者使用选择器?
谢谢
配置:Redhat 5.3,Sun Java 1.6.31,ActiveMQ 5.4.2
我每次停止activemq都会收到异常($> service activemq stop)
ACTIVEMQ_HOME: /opt/apache/apache-activemq-5.4.2
ACTIVEMQ_BASE: /opt/apache/apache-activemq-5.4.2
Connecting to JMX URL: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
ERROR: java.lang.RuntimeException: Failed to execute stop task. Reason: java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: localhost; nested exception is:
java.net.ConnectException: Connection refused]
java.lang.RuntimeException: Failed to execute stop task. Reason: java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: localhost; nested exception is:
java.net.ConnectException: Connection refused]
at org.apache.activemq.console.command.ShutdownCommand.runTask(ShutdownCommand.java:107)
at …Run Code Online (Sandbox Code Playgroud) 我有一个使用ActiveMQ版本5.10的Spring JMS应用程序.我正在执行简单的并发测试.我使用Spring Boot,当前版本和注释来配置JMSListener和消息生成器.
消息生成器只是尽可能快地在队列上抛出消息.消息侦听器将消息从队列中拉出,但在获取消息后休眠1秒钟 - 模拟消息侦听器在获取消息后需要执行的一些工作.
我将JMSListener设置为100-1000个并发线程.如果我开始在同一时间的消息生产者和消费者(无论是在自己的JVM上运行),消费者从来没有得到最低配置的螺纹上方,尽管最大范围设为1000.
如果我让生产者首先开始并在队列上放置几千条消息,然后启动一个或多个消费者实例,它将稳定地提升线程,从100开始,然后每秒20个左右的线程,直到达到状态队列中有大约20-30条消息在飞行中.它永远不会捕获生成器 - 即使消费者没有接近其maxConcurrency计数,也总会有一些消息在队列中.
为什么消息使用者没有突然进入一堆额外的线程来清空队列而不是让队列中有20-30条消息呢?消费者是否有办法继续快速添加线程以便赶上队列中的消息?
以下是代码的相关部分.
消息制作者
@Component
public class ClientServiceImpl implements ClientService {
private static final String QUEUE="message.test.queue";
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void submitMessage(ImportantMessage importantMessage) {
System.out.println("*** Sending " + importantMessage);
jmsTemplate.convertAndSend(QUEUE, importantMessage);
}
}
Run Code Online (Sandbox Code Playgroud)
消息消费者
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(AmqConsumerApplication.class, args);
}
@Value("${JMSHost}")
private String JMS_BROKER_URL;
@Autowired
static Command command;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false); …Run Code Online (Sandbox Code Playgroud) 我有spring的配置和一个完整的功能stomp代理(activemq):
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static Logger LOG = org.slf4j.LoggerFactory.getLogger(WebsocketConfig.class);
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic/", "/queue/");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/socket").withSockJS();
}
}
Run Code Online (Sandbox Code Playgroud)
天真地,我虽然spring使用了我当前的activemq配置,但实际上它尝试使用默认的stomp端口连接到localhost中的服务器.我发现可以通过输入以下内容来更改此配置:
config.enableStompBrokerRelay("/topic/", "/queue/")
.setRelayHost("activeMQHOST")
.setRelayPort(9999);
Run Code Online (Sandbox Code Playgroud)
多数民众赞成,但目前我有两个经纪人的故障转移设置(master/flave with shared file system).如何为stomp broker relay配置这样的设置?
如果不可能,我想在以下解决方案中:
第二种选择是可取的吗?
在JBoss 5.1.0上,我使用*-ds.xml(标准jboss DS)配置了Datasource(PostgreSQL 8.3.11).它使用XADataSource(PGXADataSource).我也有ActiveMQ代理(现在它在VM中运行,在JBoss下,但它将在单独的服务器上运行).
我想做的是让ActiveMQ连接工厂和数据源参与XA事务.例如,我想更新DB记录并将UMS消息作为UOW发送.你明白了.
我在my-pg-ds.xml中配置了PGXADataSource并且它可以工作(我可以跟踪执行到PGXAConnection的start方法).我曾尝试直接在Spring中配置ActiveMQXAConnectionFactory(我使用的是Spring 3.0.2.RELEASE),但这不起作用,因为在这种情况下是Spring事务管理器(我使用注释让Spring配置JtaTransactionManager,它只需将所有工作委托给Jboss事务管理器)没有为给定的ActiveMQXAConnection登记XAResource .每当我尝试发送消息时,我都会收到异常JMSException,说"会话的XAResource尚未在分布式事务中登记".从ActiveMQXASession抛出.
由于这不起作用,我已切换到ActiveMQ ConnectionFactory的JCA配置(基于此文档),它适用于常规ConnectionFactory,但我不明白如何配置它以使用XAConnectionFactory.看起来资源适配器根本没有适用于XA连接工厂的ManagedConnectionFactory,ManagedConnection等实现.
我错过了什么或者我别无选择,只能为资源适配器编写XA包装器?
这个问题
如何配置ActiveMQ和<flow>进入Mule ESB 3.2,以确保从队列中提取的消息最终由外部正确处理CXF service?
脚本
我有一个CXF端点,它应该接收传入消息并尽快将其传输到三个外部服务.我们称它们为EX1,EX2,EX3.由于<all>Mule 3.x中引入了组件,这非常简单.
整个解决方案最重要的要求是确保每个收到的消息最终都交付给所有三个CXF服务.所以我们最终得出了这个想法,将每个传入的消息放入Persistent JMS queues(Q1,Q2,Q3).在从队列Qn读取消息之后,它直接转移到相应的EXn端点,因此 - 外部服务.
配置
(我可以根据要求提供完整的配置)
我们已经按照此处的描述配置了ActiveMQ代理,并将其与我们的<flow>配置连接起来.一切似乎按预期工作,我有JConsole连接到我的应用程序,所以我可以看到消息是PERSISTENT类型,他们最终排队等候.如果一切顺利 - 所有三个服务EXn都会收到消息.
测试
当我们关闭其中一个服务时,问题就出现了,让我们说EX2,并重新启动整个服务器模拟失败.消息最终会丢失(我想这不是那么持久,是吧?).最奇怪的是 - 如果我们在EX2关闭时发送了10条消息,那么在服务器重启后,其中9条正在被重新传送!所以我想也许,也许,这10条消息中有9条已经正确入队,而当服务器发生故障时,这条消息中的9条经常被重新传送.
这让我觉得,CXF端点没有处理事务支持,说实话我无法理解.毕竟,当我尝试重新传递时,我可以看到消息在队列中,所以它应该被保留.这显然不是,但为什么呢?
我自己的尝试 我已经尝试过很多东西,但都没有.总是有一条消息丢失.
<jms:transaction />在流程中使用任何标签 - 不起作用<cxf:jaxws-client /><xa-transaction />- 并没有用<default-exception-strategy>配置 - 如果我记得它让事情变得更糟任何帮助表示赞赏,谢谢.
ACTIVE MQ CONFIGURATION
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean …Run Code Online (Sandbox Code Playgroud) 我希望能够从同一个JVM中的嵌入式ActiveMQ(5.4.2)代理中获取从java中搜索主题的消费者数量.JMX真的是唯一的选择吗?JMX似乎是一个糟糕的选择,因为它可能被选中禁用.这篇文章展示了如何使用JMX获取连接列表:ActiveMQ:通过JMX获取连接列表?
我更喜欢基于非JMX的解决方案,因为它可能被禁用.我猜JMX如果在禁用时仍然可以从java中使用它就没问题.我只是熟悉启用/禁用它以与jconsole一起使用.
我错过了API中的内容吗?
我要求设计一个通知/发布,其子系统发送电子邮件的应用程序.我计划使用jms发布/订阅(主题)消息来做到这一点
现在的可见性,将有20到30个订阅者,并且将要发布的消息数量将在每天30000到50000个消息的范围内.
我计划使用ActiveMQ JMS + Spring 3 + Tomcat 6实现 问题
我是JMS的新手,我想知道上面的负载是否很高?
我们真的需要在服务器上部署单独的ActiveMQ,还是我们在Webapp中使用嵌入式ActiveMQ就足够了?
单独的ActiveMQ服务器/嵌入式服务器有哪些优点/缺点?
我列出了每个队列中的队列和消息.以下是我的代码.但是,QueueBrowser无法正确检索邮件.
比方说,我有一个名为TestQueue的队列,它有1000条消息.
我第一次运行我的程序时它只显示200条消息.第二 - 400第三 - 600第四 - 800第五 - 1000
你能告诉我如何解决这个问题吗?
ConnectionFactory out = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.all=10000");
ActiveMQConnection connection = (ActiveMQConnection) out.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Set<ActiveMQQueue> amqs = connection.getDestinationSource().getQueues();
Iterator<ActiveMQQueue> queues = amqs.iterator();
while ( queues.hasNext() )
{
ActiveMQQueue queue_t = aqueues.next();
String q_name = queue_t.getPhysicalName();
List<ActiveMQMessage> msgList = ((ActiveMQSession) session).getUnconsumedMessages();
System.out.println( "\nQueue = " + q_name);
QueueBrowser queueBrowser = session.createBrowser(queue_t);
Enumeration e = queueBrowser.getEnumeration();
int numMsgs = 0;
while(e.hasMoreElements())
{
Message message = (Message) e.nextElement();
numMsgs++; …Run Code Online (Sandbox Code Playgroud) 我正在尝试构建一个基于运行ActiveMQ的Spring Websocket Demo作为带有Undertow的STOMP消息代理的websocket消息传递应用程序.应用程序在不安全的连接上运行良好.但是,我很难配置STOMP Broker Relay以转发SSL连接.
正如Spring WebSocket Docs中提到的......
上述配置中的"STOMP代理中继"是Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息.为此,它建立到代理的TCP连接,将所有消息转发给它,然后通过其WebSocket会话将从代理接收的所有消息转发给客户端.从本质上讲,它充当"转发",可以在两个方向上转发消息.
此外,文档说明我对反应堆网的依赖...
请在org.projectreactor上添加依赖项:reactor-net用于TCP连接管理.
问题是我当前的实现没有通过SSL 初始化NettyTCPClient,因此ActiveMQ连接失败并出现SSLException.
[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...
Run Code Online (Sandbox Code Playgroud)
因此,我试图研究Project Reactor Docs来为连接设置SSL选项,但我没有成功.
在这一点上我已经找到了StompBrokerRelayMessageHandler初始化NettyTCPClient默认情况下Reactor2TcpClient然而,这似乎并没有配置.
非常感谢协助.
SSCCE
app.props
spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx
Run Code Online (Sandbox Code Playgroud)
WebSocketConfig
//...
@Configuration
@EnableWebSocketMessageBroker
public …Run Code Online (Sandbox Code Playgroud) activemq-classic ×10
java ×6
jms ×4
java-ee ×3
jmx ×2
spring ×2
stomp ×2
concurrency ×1
cxf ×1
ibm-mq ×1
jboss ×1
mule ×1
spring-boot ×1
websocket ×1