我想创建一个java类,其唯一目的是检查ActiveMQ代理的状态(或连接到ActiveMQ代理,因为中断可能被定义为客户端也失去网络连接).
所以基本上会有一个线程在每隔几秒钟后运行以检查代理的状态,如果有经纪人关闭,我想做一些特定的邮件支持组和类似的任务.
在线示例不够详细,无法解释如何实现上述目标.
有人已经做过这个,或者可以建议一个很好的方法来实现这一点?
谢谢,Neeraj
目前,我们在ActiveMQ库之上使用一些自定义代码来进行JMS消息传递.我一直在寻求切换到Camel,易于使用,易于维护和可靠性.
使用我目前的配置,Camel的ActiveMQ实现比我们的旧实现要慢得多,无论是发送和接收的每条消息的延迟,还是发送和接收大量消息所花费的时间.我试过调整一些配置(例如最大连接),但无济于事.
我有两个应用程序,一个使用我们的旧实现,一个使用Camel实现.每个应用程序都将JMS消息发送到本地ActiveMQ服务器上的主题,并且还侦听有关该主题的消息.这用于测试两个场景: - 在循环中向主题发送100,000条消息,并查看从开始发送到结束处理所有这些消息所需的时间. - 每100毫秒发送一条消息,并测量从发送到处理每条消息的延迟(以ns为单位).
我是否可以根据发送到消息洪流的时间和个别消息的时间来改进下面的实现?理想情况下,改进将涉及调整我错过的一些配置,或建议更好的方法来做,而不是太hacky.将会赞赏对改进的解释.
编辑:既然我是异步发送消息,我似乎有一个并发问题.receivedCount没有达到100,000.查看ActiveMQ Web界面,排队100,000条消息,排队100,000条,因此消息处理方面可能存在问题.我已经改成receivedCount了一个AtomicInteger并添加了一些日志来帮助调试.这可能是Camel本身(或ActiveMQ组件)的问题,还是消息处理代码有问题?据我所知,只有~99,876条消息正在通过floodProcessor.process.
编辑:更新了异步发送和记录并发问题.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.Logger;
public class CamelJmsTest{
private static final Logger logger = Logger.getLogger(CamelJmsTest.class);
private static final boolean flood = true;
private static final int NUM_MESSAGES …Run Code Online (Sandbox Code Playgroud) 尝试使用BrokerFactory创建ActiveMQ代理时出现此错误:
java.io.IOException: Could not load failover factory:java.io.IOException: Could not find factory class for resource: META-INF/services/org/apache/activemq/broker/failover
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:27)
at org.apache.activemq.broker.BrokerFactory.createBrokerFactoryHandler(BrokerFactory.java:43)
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:70)
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
at ...
Caused by: java.io.IOException: Could not find factory class for resource: META-INF/services/org/apache/activemq/broker/failover
at org.apache.activemq.util.FactoryFinder$StandaloneObjectFactory.loadProperties(FactoryFinder.java:96)
at org.apache.activemq.util.FactoryFinder$StandaloneObjectFactory.create(FactoryFinder.java:58)
at org.apache.activemq.util.FactoryFinder.newInstance(FactoryFinder.java:146)
at org.apache.activemq.broker.BrokerFactory.createBrokerFactoryHandler(BrokerFactory.java:41)
... 5 more
Run Code Online (Sandbox Code Playgroud)
它似乎是路径错误或类似的东西,我只是无法找出根本原因.
这是导致它的代码:
URI brokerUri = new URI(bean.getBrokerConfigUrl());
broker = BrokerFactory.createBroker(brokerUri);
Run Code Online (Sandbox Code Playgroud)
这是我打电话的网址:
failover:(tcp://internalUrl.net:port#,tcp://internalUrl.net:port#)?randomize=false&timeout=30000&jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=0
Run Code Online (Sandbox Code Playgroud)
这可能是一个路径错误吗?我不包括一个必需的罐子吗?网址格式不正确吗?我迷失在这里.
编辑:添加赏金
JMS主题和JMS选择器是完全不同的概念,但它们都可以被消费者使用以仅获取消息的子集.
在PubSub场景中,使用以下方法过滤消息的专业人员和骗子是什么?
选项1)在单个主题中发布所有内容,使用者使用JMS选择器
选项2)订阅者订阅一个或多个主题.
例如:
我正在发布客户端数据消息,订阅者可能只想获得特定类型的客户端.客户端类型在不同的属性中定义,如'ClientGroup''ClientSource''ClientOrgUnit''ClientSize'等
我该怎么办:
1)为每个客户端类型创建一个不同的主题
或
2)在主题上并让每个类型都在JMS属性中并让订阅者使用选择器?
谢谢
我正在为java(gozirra,stompj,activemq)尝试几个stomp库.所有这些库都有很差的文档,例如只有一个例子,我有一个严重的问题:
我需要SSL支持.
stomp + ssl协议存在并由activemq支持,但我无法找到支持它的Java客户端.
配置:Redhat 5.3,Sun Java 1.6.31,ActiveMQ 5.4.2
我每次停止activemq都会收到异常($> service activemq stop)
ACTIVEMQ_HOME: /opt/apache/apache-activemq-5.4.2
ACTIVEMQ_BASE: /opt/apache/apache-activemq-5.4.2
Connecting to JMX URL: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
ERROR: java.lang.RuntimeException: Failed to execute stop task. Reason: java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: localhost; nested exception is:
java.net.ConnectException: Connection refused]
java.lang.RuntimeException: Failed to execute stop task. Reason: java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: localhost; nested exception is:
java.net.ConnectException: Connection refused]
at org.apache.activemq.console.command.ShutdownCommand.runTask(ShutdownCommand.java:107)
at …Run Code Online (Sandbox Code Playgroud) 我ActiveMQ用来发送消息.
所以当我发送消息时,消息就会收到消息.成功插入后,即可确认.
但我确认后会有代码,可以抛出NullPointerException.
因此,为了故意产生这种异常,我已经抛出了NullPointerException.所以当它这样做时:
消息不是dequeued,并且相同的消息再次出现在该onMessage函数中.
我的代码是:
public void onMessage(Message message) {
String msg = null;
try
{
msg = receiveMessage(message);
// Other code to insert message in db
message.acknowledge();
if(true)
{
throw new NullPointerException("npe"));
}
** // Other code which might produce a null pointer exception **
}
catch(Exception ex)
{
}
}
Run Code Online (Sandbox Code Playgroud)
为什么消息会再次onMessage()发挥作用,因为我acknowledge()也有.
因为我已经在db中插入了消息.
队列中的消息是否会被删除acknowledge()?
我怎么能做到这一点?
编辑:改述问题:
我想使用ActiveMQ作为我的服务器和客户端应用程序之间的信使服务.
我试图在服务器中设置一个嵌入式代理(即不是一个单独的进程)来处理生成的消息供我的客户端使用.此队列是持久的.
经纪人初始化如下:
BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();
Run Code Online (Sandbox Code Playgroud)
经过修修补补后,我最终得到的服务器部分是:
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " …Run Code Online (Sandbox Code Playgroud) 我有一个使用ActiveMQ版本5.10的Spring JMS应用程序.我正在执行简单的并发测试.我使用Spring Boot,当前版本和注释来配置JMSListener和消息生成器.
消息生成器只是尽可能快地在队列上抛出消息.消息侦听器将消息从队列中拉出,但在获取消息后休眠1秒钟 - 模拟消息侦听器在获取消息后需要执行的一些工作.
我将JMSListener设置为100-1000个并发线程.如果我开始在同一时间的消息生产者和消费者(无论是在自己的JVM上运行),消费者从来没有得到最低配置的螺纹上方,尽管最大范围设为1000.
如果我让生产者首先开始并在队列上放置几千条消息,然后启动一个或多个消费者实例,它将稳定地提升线程,从100开始,然后每秒20个左右的线程,直到达到状态队列中有大约20-30条消息在飞行中.它永远不会捕获生成器 - 即使消费者没有接近其maxConcurrency计数,也总会有一些消息在队列中.
为什么消息使用者没有突然进入一堆额外的线程来清空队列而不是让队列中有20-30条消息呢?消费者是否有办法继续快速添加线程以便赶上队列中的消息?
以下是代码的相关部分.
消息制作者
@Component
public class ClientServiceImpl implements ClientService {
private static final String QUEUE="message.test.queue";
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void submitMessage(ImportantMessage importantMessage) {
System.out.println("*** Sending " + importantMessage);
jmsTemplate.convertAndSend(QUEUE, importantMessage);
}
}
Run Code Online (Sandbox Code Playgroud)
消息消费者
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(AmqConsumerApplication.class, args);
}
@Value("${JMSHost}")
private String JMS_BROKER_URL;
@Autowired
static Command command;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false); …Run Code Online (Sandbox Code Playgroud) 我有spring的配置和一个完整的功能stomp代理(activemq):
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static Logger LOG = org.slf4j.LoggerFactory.getLogger(WebsocketConfig.class);
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic/", "/queue/");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/socket").withSockJS();
}
}
Run Code Online (Sandbox Code Playgroud)
天真地,我虽然spring使用了我当前的activemq配置,但实际上它尝试使用默认的stomp端口连接到localhost中的服务器.我发现可以通过输入以下内容来更改此配置:
config.enableStompBrokerRelay("/topic/", "/queue/")
.setRelayHost("activeMQHOST")
.setRelayPort(9999);
Run Code Online (Sandbox Code Playgroud)
多数民众赞成,但目前我有两个经纪人的故障转移设置(master/flave with shared file system).如何为stomp broker relay配置这样的设置?
如果不可能,我想在以下解决方案中:
第二种选择是可取的吗?
activemq-classic ×10
java ×6
jms ×4
java-ee ×2
spring ×2
stomp ×2
apache-camel ×1
concurrency ×1
eclipse ×1
ibm-mq ×1
jar ×1
jmx ×1
listener ×1
pathing ×1
performance ×1
ssl ×1
websocket ×1