我有一个完全通过java代码配置的嵌入式代理(没有可部署的xml文件).当我将浏览器指向Web控制台时,我只得到一个404,并且lsof显示没有任何内容正在监听8161.是否可以仅使用带有Java代码的Web控制台?我使用ActiveMQ附带的网络服务器很好,我对8161端口很好.
我正在尝试使用JNDI创建与ActiveMQ的简单连接.
队列名为'example.A'.
根据触及JNDI的ActiveMQ文档,如果我想通过JNDI使用ConectionFactories和Queues(Topics),我必须将jndi.properties文件放在我的类路径上.据我所知,默认情况下,activeMQ类路径是%activemq%/ conf目录.我没有改变它.所以我的队列有这个属性:
queue.MyQueue = example.A
我为ActiveMQ创建了java客户端类,它使用JNDI如下:
Properties jndiParameters = new Properties() ;
jndiParameters.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
jndiParameters.put(Context.PROVIDER_URL, "tcp://localhost:61616");
Context context = new InitialContext(jndiParameters);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory");
Queue queue = (Queue) context.lookup("MyQueue");
Run Code Online (Sandbox Code Playgroud)但它找不到我的队列,它抛出异常:javax.naming.NameNotFoundException:MyQueue
我的错误在哪里?
我正在使用ActiveMQ中的系统,我真的不想丢失消息.我的问题是重试消息导致我的消费者阻止(而不是处理他们可以处理的消息).我想在几天内给失败的消息重试(例如,我的一个潜在目的地是我将通过SFTP访问的另一台服务器,可能会关闭),但我不希望消费者阻塞几天 - - 我希望它继续处理其他消息.
有没有办法让经纪人稍后重新发送消息?现在我正在考虑将消息从队列中取出并延迟启动,但我想知道是否有更简单的方法.我正在使用Apache Camel,所以使用它的解决方案也会很好.
我有一个用Java编写的单线程ActiveMQ使用者.我要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它.如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送.
除了重新发送部分之外,它或多或少都在工作:每次重新启动我的消费者时,它会为仍然在队列中的每个消息收到一条消息,但是在发送它们之后,消息永远不会被重新发送.
我的代码看起来像:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} …Run Code Online (Sandbox Code Playgroud) 我的系统有以下几个部分:
到目前为止,在开发、测试和生产环境中一切正常。
我们刚刚在生产中连接了一个新的客户端系统,我们注意到它的日志开始报告“通道不活动太久”异常并断开连接。担心这个客户端的整体影响是它停止了代理上的所有消息消耗,从而使整个系统停止。
此客户端侦听器(使用 Spring 缓存连接工厂)似乎可以正常连接到 JMS 代理,处理一些消息,然后 3 分钟报告异常。在 ActiveMQ 中打开 DEBUG 并获得大量输出,但没有任何迹象表明同时在代理上出现警告或错误。
相信 ActiveMQ 有一些内部保持活动,即使不活动时间超过默认 30 秒,也应该保持连接。
基础设施人员已经监控了这个客户端的 VPN,并确认它一直保持连接状态。
不要相信代码或 Spring 配置有问题,因为我们在不同的客户端中有许多其他的侦听器实例,并且它们的行为都很好。
假设我真的有两个问题:
编辑 - 添加异常堆栈跟踪:
2013-04-24 14:02:06,359 WARN - Encountered a JMSException - resetting the underlying JMS Connection (org.springframework.jms.connection.CachingConnectionFactory)
javax.jms.JMSException: Channel was inactive for too (>30000) long: jmsserver/xxx.xx.xx.xxx:61616
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) …Run Code Online (Sandbox Code Playgroud) 我阅读了有关的文档prefetch buffer.按照我的理解,如果我将Prefetch值= 1 分配给消费者A.一次将Activemq push 1消息发送给A .once A向activemq发送确认,然后只有activemq将另一条消息发送给A.
我怀疑的是,我需要为消费者分配预取值.
我需要在消费者程序中分配预取值.如果它是正确的,你能用简单的代码解释.
谢谢.
我正在使用 apache 的 activemq 进行排队。在向队列写入内容时,我们开始经常看到以下异常:
Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed:
at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:282)
at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:271)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1366)
Run Code Online (Sandbox Code Playgroud)
我无法弄清楚是什么导致了这种情况 - 甚至坦率地说,从哪里开始调试导致这种情况的原因。
这是队列设置代码:
camelContext = new DefaultCamelContext();
camelContext.setErrorHandlerBuilder(new LoggingErrorHandlerBuilder());
camelContext.getShutdownStrategy().setTimeout(SHUTDOWN_TIMEOUT_SECONDS);
routePolicy = new RoutePolicy();
routePolicy.setCamelContext(camelContext);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(queueUri);
// use a pooled connection factory between the module and the queue
pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);
// how many connections should there be in the session pool?
pooledConnectionFactory.setMaxConnections(this.maxConnections); …Run Code Online (Sandbox Code Playgroud) 任何人都可以解释 ActiveMQ 重新交付策略实际上是如何工作的吗?它是在客户端还是服务器端工作?
假设我有一个重新传递策略,最多可以重新传递 10 分钟的消息,每次尝试之间间隔 30 分钟,那么失败的消息究竟在哪里?
假设消息现在失败并且必须在 30 分钟后重新发送,那么消息在哪里?
我看到同一个消费者在 30 分钟后收到消息。
我想知道 ActiveMQ 在哪里存储此消息 30 分钟。
如果我关闭了消费者或我的消费者崩溃了,我能在 30 分钟后恢复消息吗?
我浏览了这些 ActiveMQ 页面以了解重新交付策略,但没有找到任何信息:
http://activemq.apache.org/redelivery-policy.html http://activemq.apache.org/message-redelivery-and-dlq-handling.html
我面临着架构假设的问题。小背景:我正在编写一个游戏服务器,它可以为指定的玩家创建房间,并且在该房间(websocket)内,服务器正在侦听任何事件(例如玩家丢失、互联网断开等)。整体性将是可扩展的,因此将在一些 LB 监督下部署多个 Tomcat 节点。目前我已经弄清楚了这个过程(还记得关于 tomcat 集群):
客户端通过 REST API 登录到 GameServer,会话保存在所有节点之间共享的 RedisDB 中。服务器返回 HTTP 200。
客户已登录系统,他决定搜索游戏。调用“/search/game”端点。
2.1. 系统处理用户请求并将播放器添加到 RedisDB 的临时队列(下一个或同一实例现在不确定),并向客户端返回 HTTP 200。
一个特定节点上的调度程序每 10-20 秒运行一次。并查看 RedisDB 是否有任何记录,如果是真的,那么服务器会一直将玩家并列并为他们创建游戏室,直到 RedisDB 为空或有奇怪的玩家。
我不确定如何在 websocket 中并置特定的播放器并使其对其他节点可见。我用谷歌搜索了一段时间,发现了一个类似的例子
堆栈:tomcat 集群中的 Spring Websocket
用户 Thanh Nguyen Van 提到了通过多个 tomcat 实例处理 websockets 流量的两种方法,所以我决定使用 Apache ActiveMq JMS 和 Apache Camel 作为处理 websockets 流量的解决方案。
我已经在沙盒应用程序上成功实现了它,其中 3 个不同的用户可以登录系统并相互发送私人消息。整体搭配来实现ActiveMq,Camel,STOMP,SockJS和Spring Boot。很公平……但这是我坚持的部分。我觉得我已经得到了我需要的所有积木,但我不能把它们放在一起。
首先,我不确定给定的过程是否适合提出的问题。我正在考虑points 2 and 3,而不是调用REST endpoint和添加玩家RedisDB …
我有一个单一的Java应用程序,围绕@Service我的业务服务层的Spring bean 构建.通常,我的每个业务服务方法都有Spring Security注释(例如@PreAuthorize)来为该操作强制执行适当的授权规则.
在主Web应用程序流程中,这非常有效; 每个Web请求隐式地具有由会话cookie等处理的认证.
但是,当涉及到与其他"内部"系统的各种集成点时,我没有看到清晰的解决方案.
例如,我将从JMS队列中消耗方法,该队列已经在代理中定义了自己的身份验证和授权规则,因此我想隐含地"信任"我得到的消息.但是,就像现在的情况一样,一个简单的Camel路线如下:
WidgetService widgetService = lookup(WidgetService.class);
from("activemq:newWidget")
.unmarshall(...)
.bean(widgetService, "newWidget");
Run Code Online (Sandbox Code Playgroud)
最后扔了一个AuthenticationCredentialsNotFoundException.
这告诉我Camel正在正确地调用我的bean,并且从Spring应用了所有神奇的AOP.
对于其他类似的事情,我已经围绕系统的入口点(例如,围绕Quartz Job的execute方法)应用AOP建议,这注入了一个PreAuthenticatedAuthenticationToken,但我不确定这是否真的是最好的方法.
我是否应该继续在建议中包含这些"可信"入口点以添加身份验证上下文,或者我是否应该更改我的服务层以使某些业务方法的特殊形式不需要身份验证,并确保我明确记录他们不是用于网络@Controller方法等?