我是activeMQ 的新手。我需要编写代码来获取所有队列并读取消息。我没有找到像 get all Queues 这样的 API。如何从ActiveMQ读取队列。如果可能,一些示例会有所帮助。
我正在寻找一个ActiveMQ经纪人admin命令,告诉它暂停队列-即:
我找不到这样的命令。最常见的答案是应该在客户端对其进行管理-即找到每个消费者并停止它。其他答案是解决方法,例如操纵网络路由或防火墙,以便客户端和代理不再通信。
对其他消息队列的粗略调查表明,ActiveMQ在这方面并不罕见。
在我看来,可能无法实现此功能有两个原因:
是什么,为什么?
我在 windows xp 上运行 apachemq。我没有运行 Web 服务器服务或任何类型的数据库,但是一旦我启动活动 mq,我就会不断收到此错误
错误 | 无法启动 Apache ActiveMQ ([localhost, ID:computer_1-3725-13902958 73141-0:1], java.net.URISyntaxException: Illegal character in hostname at index 13: ws://computer_1:61614?maximumConnections=1000&wireFormat.maxFrameSize =104857 600)
这是完整的日志
C:\apache_activemq>.\bin\activemq
Java Runtime: Oracle Corporation 1.7.0_21 C:\Program Files\Java\jdk1.7.0_21\jre
Heap sizes: current=1013632k free=996854k max=1013632k
JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.c
onfig.file=logging.properties -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhaw
tio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.security
.auth.login.config=C:\apache_activemq\bin\..\conf\login.config -Dactivemq.classp
ath=C:\apache_activemq\bin\..\conf;C:\apache_activemq\bin\../conf;C:\apache_acti
vemq\bin\../conf; -Dactivemq.home=C:\apache_activemq\bin\.. -Dactivemq.base=C:\a
pache_activemq\bin\.. -Dactivemq.conf=C:\apache_activemq\bin\..\conf -Dactivemq.
data=C:\apache_activemq\bin\..\data -Djava.io.tmpdir=C:\apache_activemq\bin\..\d
ata\tmp
Extensions classpath:
[C:\apache_activemq\bin\..\lib,C:\apache_activemq\bin\..\lib\camel,C:\apache_a
ctivemq\bin\..\lib\optional,C:\apache_activemq\bin\..\lib\web,C:\apache_activemq
\bin\..\lib\extra]
ACTIVEMQ_HOME: C:\apache_activemq\bin\..
ACTIVEMQ_BASE: C:\apache_activemq\bin\..
ACTIVEMQ_CONF: C:\apache_activemq\bin\..\conf
ACTIVEMQ_DATA: C:\apache_activemq\bin\..\data
Loading message …Run Code Online (Sandbox Code Playgroud)
当我在处理Message-Queue时,遇到静态队列和动态队列这个词.
任何人都可以告诉我区别吗?
在使用“MySQL”(http://activemq.apache.org/jdbc-support.html)实现持久化之后
我发现下一个问题:
文件配置:--> activemq.xml
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic …Run Code Online (Sandbox Code Playgroud) 我正在尝试遵循 ActiveMQ 教程。使用activemq-admin.bat start从命令提示符启动 ActiveMQ 后, 我尝试访问http://localhost:8161/admin/ 以创建新队列,但它抛出了一个错误。(我可以访问http://localhost:8161/)
HTTP ERROR: 401
Problem accessing /admin/index.jsp. Reason:
Unauthorized
Run Code Online (Sandbox Code Playgroud)
任何人都可以请给我建议任何解决方案吗?
我有一条骆驼路线,如下所示
from("activemq:queue:upload" )
.pollEnrich().simple("file:basePath/${header.ID}?noop=true&recursive=true")
.aggregationStrategy(new ExampleAggregationStrategy())
.timeout(2000)
.toD("ftp:${header.destinationURI}")
Run Code Online (Sandbox Code Playgroud)
在我的文件系统中file:basePath/${header.ID}包含多个文件夹。执行上述路由时,只会将第一个文件夹中的第一个文件复制到 ftp 服务器。剩余的文件夹(带子文件夹)没有被复制到 ftp 服务器!
而ExampleAggregationStrategy()类的aggregate()方法看起来像以下
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String destinationURI = "populatedURI";
oldExchange.setOut(newExchange.getIn());
oldExchange.getOut().setHeader("ID", oldExchange.getIn().getHeader("ID"));
oldExchange.getOut().setHeader("destinationURI", destinationURI);
oldExchange.setFromEndpoint(newExchange.getFromEndpoint());
oldExchange.setFromRouteId(newExchange.getFromRouteId());
return oldExchange;
}
Run Code Online (Sandbox Code Playgroud)
我也试过设置properties and onCompletions。仍然没有运气!我错过了什么aggregationStrategy吗?
如何成功复制所有文件和文件夹pollEnrich?
我正在开发一个使用 ActiveMQ 和 Play Framework v2.4.2(Java 版)向最终用户发送电子邮件的消息传递系统。我是 JMS/ActiveMQ 技术的新手。我刚刚在 ActiveMQ 站点上使用了这个 Hello World 示例作为起点。
我创建了一个如下的测试类来测试使用 Play Framework 运行 ActiveMQ,一切正常:
public class ActiveMQMailApp {
public static void main(String[] args) throws Exception {
setup();
MailConsumer.initService();
for (int i =0;i<11;i++) MailProducer.sendMail(fakeMail());
}
public static void setup(){
FakeApplication fakeApplication = Helpers.fakeApplication();
Helpers.start(fakeApplication);
}
private static Mail fakeMail() throws InterruptedException {
Thread.sleep(1000);
SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss");
return new Mail( "noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> "+sdf.format(new Date())+" </b></p></body></html>");
}
} …Run Code Online (Sandbox Code Playgroud) 根据http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#acknowledge()
客户端可以在消费每条消息时分别对其进行确认,也可以选择将消息确认为应用程序定义的组(通过在组中最后收到的消息上调用确认来完成,从而确认会话消耗的所有消息。 )
如何在ActiveMQ中做到这一点?我无法使其工作。
尝试通过amqp10使用 AMQP 1.0 协议连接到我的 Apache ActiveMQ 代理。我正在使用以下代码(改编自自述文件中的原始示例):
const AMQPClient = require("amqp10").Client;
const Promise = require("bluebird");
//Fix from: https://github.com/noodlefrenzy/node-amqp10/issues/241
const activeMQPolicy = require("amqp10").Policy;
const client = new AMQPClient(activeMQPolicy.ActiveMQ);
const setUp = () => {
return Promise.all([
client.createReceiver("amq.topic"),
client.createSender("amq.topic")
]);
};
client.connect("amqp://localhost")
.then(setUp)
.spread(function (receiver, sender) {
receiver.on("errorReceived", function (err) {
if (err) {
console.log(`failed with error: ${err}`);
return;
}
receiver.on("message", message => console.log(`Rx message: ${message.body}`));
return sender.send({ key: "Value" });
});
})
.error( err => console.log("error: ", err)); …Run Code Online (Sandbox Code Playgroud)