我们在 ActiveMQ 中遇到了一个问题,我们有大量的消息没有脱离主题。主题设置为非持久性、非持久性。我们的 Activemq.xml 文件是
<beans>
<broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">
<!--
<persistenceAdapter>
<journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
</persistenceAdapter>
-->
<transportConnectors>
<transportConnector uri="vm://localhost"/>
</transportConnectors>
</broker>
</beans>
Run Code Online (Sandbox Code Playgroud)
我们在 messages-config.xml 中的主题定义是
<destination id="traceChannel">
<properties>
<network>
<session-timeout>10</session-timeout>
</network>
<server>
<message-time-to-live>10000</message-time-to-live>
<durable>false</durable>
<durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
</server>
<jms>
<destination-type>Topic</destination-type>
<message-type>javax.jms.ObjectMessage</message-type>
<connection-factory>ConnectionFactory</connection-factory>
<destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
<delivery-mode>NON_PERSISTENT</delivery-mode>
<message-priority>DEFAULT_PRIORITY</message-priority>
<acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
<transacted-sessions>false</transacted-sessions>
<initial-context-environment>
<property>
<name>Context.INITIAL_CONTEXT_FACTORY</name>
<value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
</property>
<property>
<name>Context.PROVIDER_URL</name>
<value>tcp://localhost:61616</value>
</property>
</initial-context-environment>
</jms>
</properties>
<channels>
<channel ref="rtmps" />
</channels>
<adapter ref="trace" />
</destination>
Run Code Online (Sandbox Code Playgroud)
我想要实现的是,在任何时候都只有最后 10 条消息是关于主题的,因为让它在一夜之间运行会导致超过 150K 的关于该主题的消息,即使它应该只包含很少的数字。
我正在设计一个具有多个组件的应用程序,主要是用 java 和 python 编写的。我正在考虑使用“JMS-Active MQ”作为组件和“协议缓冲区”的面向消息的中间件。
1)这是前进的好方法吗?在我们的例子中,“消息大小”可以超过 10MB,协议缓冲区是否仍然具有跨组件通信的优势?对于可以处理“海量数据”的跨平台应用程序,是否有更好的通信“协议”?
2)我创建了一个概念证明,我通过“ActiveMQ”发送“协议buff”作为消息,我使用的是google的java教程中的示例proto文件。
AddressBook.Builder book = AddressBook.newBuilder();
Person.Builder person = Person.newBuilder();
person.setName("mayank");
person.setId(2);
book.addPerson(person);
TextMessage message = session.createTextMessage();
message.setText(book.build().toString());
Run Code Online (Sandbox Code Playgroud)
在另一个 java 应用程序中,我听了这条消息并尝试将其反序列化回 AddressBook 对象:
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println(msg.getText());
CodedInputStream stream =CodedInputStream.newInstance(msg.getText().getBytes());
AddressBook book = AddressBook.parseFrom(stream);
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
这会导致异常:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol
message, the input ended unexpectedly in the middle of a field. …Run Code Online (Sandbox Code Playgroud) 我正在尝试使事务成为整个Spring集成流,流程从适配器开始到IBM MQ Queue,然后我们有一个带有ActiveMQ队列的复杂流程,我得到一个com.atomikos.icatch.HeurHazardException :当atomikos尝试注册资源时的启发式异常.
这是我的applicationContext.xml.
<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="${ibm.mq.connection.url}" />
<property name="port" value="${ibm.mq.connection.port}" />
<property name="transportType" value="${ibm.mq.conection.type}" />
<property name="queueManager" value="${ibm.mq.conection.queuemanager}" />
<property name="channel" value="${ibm.mq.conection.channel}" />
</bean>
<bean id="mqConnectionFactoryCache"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="mqConnectionFactory" />
<property name="sessionCacheSize" value="10" />
<property name="cacheConsumers" value="true"></property>
</bean>
<bean id="mqInboundQueue" class="com.ibm.mq.jms.MQQueue">
<constructor-arg value="${ibm.mq.conection.queue}" />
</bean>
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<!-- brokerURL -->
<property name="brokerURL" value="${active.mq.connection.url}" />
<property name="redeliveryPolicy" ref="amqRedeliveryPolicy"></property>
</bean>
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="5"></property>
<property name="redeliveryDelay" value="1000"></property>
</bean>
<bean id="atmConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close"> …Run Code Online (Sandbox Code Playgroud) activemq-classic spring-integration atomikos spring-transactions ibm-mq
我正在使用 Java 的 Paho 客户端通过 mqtt 连接到 activeMq。我注意到一件奇怪的事情。创建了几个文件夹,其名称类似于“paho101658642587966-tcp1270011883”并且具有空的 .lck 文件。为什么使用这些以及它们是何时创建的。
任何人都可以清楚地解释 JMS Topic 中什么是持久和非持久?
到目前为止,我有服务器和客户端模型,服务器将请求发送到客户端以完成从服务器发送的请求,并且客户端接受请求蚂蚁将响应发送回服务器。
我有一个场景,不是服务器发起请求发送给我的客户端,我的客户端必须查看服务器以获取特定请求并将响应发送回服务器。
我可以应用 JMS 主题来解决这个问题吗?如果有人清楚地解释,将不胜感激。提前致谢。
我是新来的ActiveMQ。我有 2 queues:parser-queue和generation-queue. 我的应用程序在2 个不同的服务器上运行,两者都在侦听队列。我的工作流程非常简单,从解析器队列获取消息,处理它并在工作完成后将另一条消息放入生成队列。
但是,如果在我的工作过程中,获取消息并处理它,我的应用程序将关闭,或者由于任何原因而中断。
未正确处理的同一消息如何再次发送到我的队列以进行处理?
我正在阅读有关订阅恢复策略的内容,但这似乎是一个非常复杂的主题,我不确定是否要使用基于时间的策略,因为我的工作可能会随着时间的推移而变化以完全由我的应用程序处理。
我需要在运行 activeMQ 的远程主机上运行测试,我想通过将队列名称与本地文件进行比较来验证队列名称是否正确。我使用 bstat 来获取 activeMQ 的状态,然后 grep 具有 destionName 的部分,但我想知道是否有更好的方法通过命令行或 shell 而不是 java 来做到这一点?
提前致谢!
我需要一种方法在ActiveMQ中获取队列大小而不使用JMX或使用JMS迭代所有队列.有没有使用ActiveMQ API获取队列大小的解决方案.
简而言之,如果我从 Maven 运行下面的 JMeter 计划测试,它会给出结果:
javax.naming.NamingException: javax.naming.NoInitialContextException: Cannot instantiate class: org.apache.activemq.jndi.ActiveMQInitialContextFactory [Root exception is java.lang.ClassNotFoundException: org.apache.activemq.jndi.ActiveMQInitialContextFactory]
Run Code Online (Sandbox Code Playgroud)
尽管如此,如果我在没有 maven 插件的情况下运行相同的 JMeter 计划测试,它会正确运行。
提供更多细节:
如果我以这种方式启动 ActiveMQ(Windows 命令提示符):
cd C:\_d\server\ActiveMQ\apache-activemq-5.9.0\bin
activemq.bat
Run Code Online (Sandbox Code Playgroud)
然后,我以这种方式运行完全相同的 jmx 脚本:
jmeter.bat -n -t C:\_d\scripts\JMeter\JMS_SQS_MVN\SQS.jmx -l C:\temp\activemq.jtl -j C:\temp\activemq.jmx.log
Run Code Online (Sandbox Code Playgroud)
我可以转到 ActiveMq 控制台并查看创建的标记为 sqs_dummy_mvn6 的队列,因为它出现在此类 jmx 脚本中。
现在,使用相同的脚本但通过 maven 插件,它只是显示“无法实例化类:org.apache.activemq.jndi.ActiveMQInitialContextFactory”。好吧,我明白,每当我使用 jmeter-maven-plugin 时,都会在验证阶段启动和停止嵌入式 jmeter 和嵌入式 activemq。
我通过 maven 调用相同的脚本:
mvn clean verify
Run Code Online (Sandbox Code Playgroud)
...\artproducer\src\test\jmeter\SQS.jmx(下面只启用发布者;禁用订阅以使测试更简单)
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="2.9" jmeter="3.0 r1743807">
<hashTree>
<TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Plano de Teste SQS" enabled="true">
<stringProp name="TestPlan.comments"></stringProp> …Run Code Online (Sandbox Code Playgroud) 如何在 ActiveMQ 主题中使用 发布消息curl?我试过:
curl -XPOST -d "body=message" http://admin:admin@localhost:8161/api/message?destination=queue://orders.input
Run Code Online (Sandbox Code Playgroud)
但响应是未经授权的用户 401。