在JBoss 5.1.0上,我使用*-ds.xml(标准jboss DS)配置了Datasource(PostgreSQL 8.3.11).它使用XADataSource(PGXADataSource).我也有ActiveMQ代理(现在它在VM中运行,在JBoss下,但它将在单独的服务器上运行).
我想做的是让ActiveMQ连接工厂和数据源参与XA事务.例如,我想更新DB记录并将UMS消息作为UOW发送.你明白了.
我在my-pg-ds.xml中配置了PGXADataSource并且它可以工作(我可以跟踪执行到PGXAConnection的start方法).我曾尝试直接在Spring中配置ActiveMQXAConnectionFactory(我使用的是Spring 3.0.2.RELEASE),但这不起作用,因为在这种情况下是Spring事务管理器(我使用注释让Spring配置JtaTransactionManager,它只需将所有工作委托给Jboss事务管理器)没有为给定的ActiveMQXAConnection登记XAResource .每当我尝试发送消息时,我都会收到异常JMSException,说"会话的XAResource尚未在分布式事务中登记".从ActiveMQXASession抛出.
由于这不起作用,我已切换到ActiveMQ ConnectionFactory的JCA配置(基于此文档),它适用于常规ConnectionFactory,但我不明白如何配置它以使用XAConnectionFactory.看起来资源适配器根本没有适用于XA连接工厂的ManagedConnectionFactory,ManagedConnection等实现.
我错过了什么或者我别无选择,只能为资源适配器编写XA包装器?
所以我有一个ActiveMQ代理和一些生产者,它们在代理中放置了一些自制对象.我也有一些消费者接收这些消息(序列化对象)没有问题.但我想创建一个连接到代理并显示所有消息(序列化对象)的工具.
我尝试使用和ActiveMQConnection这样做:
Set<ActiveMQQueue> currentMessageQueues = activeMQConnection.getDestinationSource().getQueues();
Iterator<ActiveMQQueue> messageQueueIterator = currentMessageQueues.iterator();
while (messageQueueIterator.hasNext()) {
ActiveMQQueue currentQueue = messageQueueIterator.next();
QueueSession queueSession = activeMQConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = queueSession.createBrowser(currentQueue);
Enumeration<?> messagesInQueue = browser.getEnumeration();
while (messagesInQueue.hasMoreElements()) {
Message queueMessage = (Message) messagesInQueue.nextElement();
if (queueMessage instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) queueMessage;
objectMessage.getObject();
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用此代码,我在objectMessage.getObject()上得到一个异常:
javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: MyMessage
Run Code Online (Sandbox Code Playgroud)
我使用了调试模式,ActiveMQObjectMessage将对象设置为null.
这种方法是否良好,如果是,我做错了什么?我怎样才能从经纪人那里收到这个对象?
这个问题
如何配置ActiveMQ和<flow>进入Mule ESB 3.2,以确保从队列中提取的消息最终由外部正确处理CXF service?
脚本
我有一个CXF端点,它应该接收传入消息并尽快将其传输到三个外部服务.我们称它们为EX1,EX2,EX3.由于<all>Mule 3.x中引入了组件,这非常简单.
整个解决方案最重要的要求是确保每个收到的消息最终都交付给所有三个CXF服务.所以我们最终得出了这个想法,将每个传入的消息放入Persistent JMS queues(Q1,Q2,Q3).在从队列Qn读取消息之后,它直接转移到相应的EXn端点,因此 - 外部服务.
配置
(我可以根据要求提供完整的配置)
我们已经按照此处的描述配置了ActiveMQ代理,并将其与我们的<flow>配置连接起来.一切似乎按预期工作,我有JConsole连接到我的应用程序,所以我可以看到消息是PERSISTENT类型,他们最终排队等候.如果一切顺利 - 所有三个服务EXn都会收到消息.
测试
当我们关闭其中一个服务时,问题就出现了,让我们说EX2,并重新启动整个服务器模拟失败.消息最终会丢失(我想这不是那么持久,是吧?).最奇怪的是 - 如果我们在EX2关闭时发送了10条消息,那么在服务器重启后,其中9条正在被重新传送!所以我想也许,也许,这10条消息中有9条已经正确入队,而当服务器发生故障时,这条消息中的9条经常被重新传送.
这让我觉得,CXF端点没有处理事务支持,说实话我无法理解.毕竟,当我尝试重新传递时,我可以看到消息在队列中,所以它应该被保留.这显然不是,但为什么呢?
我自己的尝试 我已经尝试过很多东西,但都没有.总是有一条消息丢失.
<jms:transaction />在流程中使用任何标签 - 不起作用<cxf:jaxws-client /><xa-transaction />- 并没有用<default-exception-strategy>配置 - 如果我记得它让事情变得更糟任何帮助表示赞赏,谢谢.
ACTIVE MQ CONFIGURATION
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean …Run Code Online (Sandbox Code Playgroud) 我一直在使用嵌入式activeMQ服务器配置(配置非常类似于说明嵌入式activeMQ服务器/监听器概念的示例).作为应用程序的一部分,我有一个监视目录的监视线程.我没有必要终止关闭activeMQ服务器的进程,而是引入了对"STOP"文件的检查,如果文件存在,服务器将关闭.我试图通过调用代理上的stop函数来完成关闭.
(broker.stop())这似乎关闭了服务.但是,在System.exit上,会引发异常错误:
我观察到另一个提到类似行为的帖子的响应表明"这是正常的"行为.另一篇文章指出我应该禁用shutdownhook(通过设置useShutdownHook ="false"来通过代理的Spring配置).这也没有解决问题.
有关此问题的任何帮助表示赞赏.
谢谢
这是异常堆栈跟踪:
17:10:20.056 [Thread-9] DEBUG org.apache.activemq.AdvisoryConsumer - Failed to send remove command: javax.jms.JMSException: Peer (vm://localhost#3) disposed.
javax.jms.JMSException: Peer (vm://localhost#3) disposed.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62) ~[activemq-core-5.5.1.jar:5.5.1]
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1267) [activemq-core-5.5.1.jar:5.5.1]
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259) [activemq-core-5.5.1.jar:5.5.1]
at org.apache.activemq.AdvisoryConsumer.dispose(AdvisoryConsumer.java:56) ~[activemq-core-5.5.1.jar:5.5.1]
at org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:615) [activemq-core-5.5.1.jar:5.5.1]
at org.springframework.jms.connection.SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:426) [spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.jms.connection.SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:321) [spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.jms.connection.SingleConnectionFactory.destroy(SingleConnectionFactory.java:312) [spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:211) [spring-beans-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:498) [spring-beans-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:474) [spring-beans-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:442) [spring-beans-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1066) [spring-context-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1040) [spring-context-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:958) [spring-context-3.1.0.RELEASE.jar:3.1.0.RELEASE]
Caused by: org.apache.activemq.transport.TransportDisposedIOException: Peer (vm://localhost#3) disposed.
at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:89) ~[activemq-core-5.5.1.jar:5.5.1]
at …Run Code Online (Sandbox Code Playgroud) 我希望能够从同一个JVM中的嵌入式ActiveMQ(5.4.2)代理中获取从java中搜索主题的消费者数量.JMX真的是唯一的选择吗?JMX似乎是一个糟糕的选择,因为它可能被选中禁用.这篇文章展示了如何使用JMX获取连接列表:ActiveMQ:通过JMX获取连接列表?
我更喜欢基于非JMX的解决方案,因为它可能被禁用.我猜JMX如果在禁用时仍然可以从java中使用它就没问题.我只是熟悉启用/禁用它以与jconsole一起使用.
我错过了API中的内容吗?
我要求设计一个通知/发布,其子系统发送电子邮件的应用程序.我计划使用jms发布/订阅(主题)消息来做到这一点
现在的可见性,将有20到30个订阅者,并且将要发布的消息数量将在每天30000到50000个消息的范围内.
我计划使用ActiveMQ JMS + Spring 3 + Tomcat 6实现 问题
我是JMS的新手,我想知道上面的负载是否很高?
我们真的需要在服务器上部署单独的ActiveMQ,还是我们在Webapp中使用嵌入式ActiveMQ就足够了?
单独的ActiveMQ服务器/嵌入式服务器有哪些优点/缺点?
我列出了每个队列中的队列和消息.以下是我的代码.但是,QueueBrowser无法正确检索邮件.
比方说,我有一个名为TestQueue的队列,它有1000条消息.
我第一次运行我的程序时它只显示200条消息.第二 - 400第三 - 600第四 - 800第五 - 1000
你能告诉我如何解决这个问题吗?
ConnectionFactory out = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.all=10000");
ActiveMQConnection connection = (ActiveMQConnection) out.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Set<ActiveMQQueue> amqs = connection.getDestinationSource().getQueues();
Iterator<ActiveMQQueue> queues = amqs.iterator();
while ( queues.hasNext() )
{
ActiveMQQueue queue_t = aqueues.next();
String q_name = queue_t.getPhysicalName();
List<ActiveMQMessage> msgList = ((ActiveMQSession) session).getUnconsumedMessages();
System.out.println( "\nQueue = " + q_name);
QueueBrowser queueBrowser = session.createBrowser(queue_t);
Enumeration e = queueBrowser.getEnumeration();
int numMsgs = 0;
while(e.hasMoreElements())
{
Message message = (Message) e.nextElement();
numMsgs++; …Run Code Online (Sandbox Code Playgroud) 我有一个线程挂起的问题,我在我的线程转储中看到以下内容:
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:129)
java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
java.io.BufferedInputStream.read(BufferedInputStream.java:317)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:632)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1195)
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:379)
org.springframework.remoting.httpinvoker.SimpleHttpInvokerRequestExecutor.validateResponse(SimpleHttpInvokerRequestExecutor.java:146)
org.springframework.remoting.httpinvoker.SimpleHttpInvokerRequestExecutor.doExecuteRequest(SimpleHttpInvokerRequestExecutor.java:66)
org.springframework.remoting.httpinvoker.AbstractHttpInvokerRequestExecutor.executeRequest(AbstractHttpInvokerRequestExecutor.java:136)
org.springframework.remoting.httpinvoker.HttpInvokerClientInterceptor.executeRequest(HttpInvokerClientInterceptor.java:192)
org.springframework.remoting.httpinvoker.HttpInvokerClientInterceptor.executeRequest(HttpInvokerClientInterceptor.java:174)
org.springframework.remoting.httpinvoker.HttpInvokerClientInterceptor.invoke(HttpInvokerClientInterceptor.java:142)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
$Proxy117.SigmaCruxer(Unknown Source)
com.tms.SigmaClient.SigmaClient.processMessage(SigmaClient.java:46)
com.tms.SigmaClient.SigmaServiceEndpoint.doSigma(SigmaServiceEndpoint.java:29)
com.tms.SigmaClient.SigmaServiceEndpoint.mark(SigmaServiceEndpoint.java:43)
sun.reflect.GeneratedMethodAccessor193.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
java.lang.reflect.Method.invoke(Method.java:597)
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:329)
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:231)
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:169)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:74)
org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:102)
org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:72)
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:50)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:114)
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:284)
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:109)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:99)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:318)
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:209)
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:305)
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:102)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:91)
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
java.lang.Thread.run(Thread.java:662)
Run Code Online (Sandbox Code Playgroud)
根据这两个技术:( …
我正在尝试构建一个基于运行ActiveMQ的Spring Websocket Demo作为带有Undertow的STOMP消息代理的websocket消息传递应用程序.应用程序在不安全的连接上运行良好.但是,我很难配置STOMP Broker Relay以转发SSL连接.
正如Spring WebSocket Docs中提到的......
上述配置中的"STOMP代理中继"是Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息.为此,它建立到代理的TCP连接,将所有消息转发给它,然后通过其WebSocket会话将从代理接收的所有消息转发给客户端.从本质上讲,它充当"转发",可以在两个方向上转发消息.
此外,文档说明我对反应堆网的依赖...
请在org.projectreactor上添加依赖项:reactor-net用于TCP连接管理.
问题是我当前的实现没有通过SSL 初始化NettyTCPClient,因此ActiveMQ连接失败并出现SSLException.
[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...
Run Code Online (Sandbox Code Playgroud)
因此,我试图研究Project Reactor Docs来为连接设置SSL选项,但我没有成功.
在这一点上我已经找到了StompBrokerRelayMessageHandler初始化NettyTCPClient默认情况下Reactor2TcpClient然而,这似乎并没有配置.
非常感谢协助.
SSCCE
app.props
spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx
Run Code Online (Sandbox Code Playgroud)
WebSocketConfig
//...
@Configuration
@EnableWebSocketMessageBroker
public …Run Code Online (Sandbox Code Playgroud) 尝试使用Spring 4和ActiveMQ实现非XML JMS侦听器.我的问题是我的客户端一直遇到以下错误:
Setup of JMS message listener invoker failed for destination 'topic.FromJndiProperties' [...]
Cause: The JMS connection has failed: Force close due to SecurityException on connect.
Cause: User name [null] or password is invalid.
Run Code Online (Sandbox Code Playgroud)
因此,使用用户名和密码null建立与目标的连接.我想我可能没有正确设置destinationResolver但我不知道如何解决这个问题.谁能帮我解决这个问题?
我的AppConfig:
@Autowired
private Environment env;
@Autowired
private BeanFactory springContextBeanFactory;
@Bean
public DefaultJmsListenerContainerFactory myListenerContainerFactory() throws NamingException {
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty("java.naming.factory.initial"));
props.setProperty(Context.PROVIDER_URL, env.getProperty("java.naming.provider.url"));
props.setProperty(Context.SECURITY_PRINCIPAL, env.getProperty("java.naming.security.principal"));
props.setProperty(Context.SECURITY_CREDENTIALS, env.getProperty("java.naming.security.credentials"));
Context jndiContext = new InitialContext(props);;
ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); …Run Code Online (Sandbox Code Playgroud)