我正在尝试使用 MySql 配置 Activemq。一切似乎都已正确配置,因为我能够通过代码发送和接收消息。但我无法在 Activemq Web 控制台中看到任何队列或消息信息。
下面是我的 activemq 配置。任何帮助表示赞赏。
谢谢你!
Activemq.xml:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/activemq-data" dataSource="#mysql-ds"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="default" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="admin"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<import resource="jetty.xml"/>
</beans>
Run Code Online (Sandbox Code Playgroud) 我有一个超级简单的场景:一名经纪人和一名具有持久订阅的消费者。这是我的消费者应用程序的代码:
package test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import pojo.Event;
import pojo.StockUpdate;
public class Consumer
{
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
public static int counter = 0;
public Consumer(String brokerURL) throws JMSException
{
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.setClientID("CLUSTER_CLIENT_1");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException
{
if (connection …Run Code Online (Sandbox Code Playgroud) 在我的消息队列中包含几条消息。我想按特定属性进行分组。这些都有一个自定义属性“item-id”
我现在将某些价值观定为:体育、电影......
我这样做是这样的:
new MessageCreator() {
ObjectMessage message = session.createObjectMessage();
message.setObject(data);
message.setStringProperty("item-id", "sports");
}
Run Code Online (Sandbox Code Playgroud)
该值也显示在属性的队列中。
当我尝试检索包含此属性的消息时,我没有得到任何结果。
初审:
Connection con = pc.createConnection();
Session sess =con.createSession(false,Session.AUTO_ACKNOWLEDGE);
AmqMessagesQueryFilter queryFilter = new AmqMessagesQueryFilter(pc, queue);
con.start();
String selector = "item-id = 'sports'";
List messages = queryFilter.query(selector);
Run Code Online (Sandbox Code Playgroud)
但列表是空的
二审:
Connection con = pc.createConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
AmqMessagesQueryFilter queryFilter = new AmqMessagesQueryFilter(pc, queue);
QueueBrowser queueBrowser = sess.createBrowser(queue,"item-id = 'sports'");
con.start();
List messages=Collections.list(queueBrowser.getEnumeration());
Run Code Online (Sandbox Code Playgroud)
该列表也是空的。
我的错误是什么?
我从未配置过 activemq,我只是使用 TomEE Plus 1.7.1 中的完全默认配置,并且它运行良好。我使用 JMS 进行异步电子邮件发送。现在我想使用调度程序支持,如下所示:
...
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
delayMinute * 60000);
...
Run Code Online (Sandbox Code Playgroud)
但消息并没有延迟。我读到 SchedulerSupport 应该在 xml 文件(可能是 activemq.xml (?))中打开,但我没有这样的文件。
我在 tomee.xml 中尝试了这个,但它不起作用:
<Resource id="myActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
schedulerSupport = true
</Resource>
Run Code Online (Sandbox Code Playgroud)
我应该将 SchedulerSupport = true 配置属性放在哪里?我不想宣布新的经纪人或任何其他事情。只是我想使用默认设置加上调度程序支持。
我缺少什么?
AMQ 版本 5.13.2 Java 1.8.0_74 Windows 10
给定一个简单的测试用例,传输两个对象消息,一个包含数据,另一个是数据结束标记。仅接收数据结束标记。
队列在作业开始时创建,并在作业完成后销毁。
如果我运行大量交易,我会看到大约 50% 的接收率。
日志清楚地显示接收器在第一条消息放入队列之前启动,两条消息都被放入队列,但实际上只接收第二条消息。
发送方和接收方都位于同一个 JVM 上。每个都有自己的会话和连接。
连接和队列设置代码:
@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
try {
jobParameters = jobExecution.getJobParameters();
readerConnection = connectionFactory.createConnection();
readerConnection.start();
writerConnection = connectionFactory.createConnection();
writerConnection.start();
jmsQueueManagementSession = writerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueName = jobParameters.getString("jobName") + "." + jobExecution.getId();
queue = jmsQueueManagementSession.createQueue(getQueueName());
} catch (JMSException ex) {
throw new MaxisRuntimeException(
MaxisCodeHelperImpl.generateCode("MXAR", MXMODULE, JMS_RECEIVER_INITIALIZATION_ERROR), null);
}
}
Run Code Online (Sandbox Code Playgroud)
发送者设置代码:
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution; …Run Code Online (Sandbox Code Playgroud) JMS 消息 ID 如下所示ID:10.77.42.209-4280-1477454185311-1:1:1391:1:1。整个字符串被“-”和“:”分成几个部分。
显然,一部分代表生产者的IP地址,一部分可能代表消息ID。但别人有什么办法呢?
所以我的问题是每个部分的含义是什么?
使用ActiveMQ并且没有自定义消息ID
我使用 spring-boot-starter-data-jpa 和 spring-boot-starter-activemq 制作了一个 POC。我想在提交 jpa 事务时将 jms 消息推送到代理(activeMQ)上。
我的代码:UtilsateurService 具有“主要”事务:
@Service
public class UtilisateurService {
@Autowired
private UtilisateurRepository utilisateurRepository;
@Autowired
private SendMessage sendMessage;
@Transactional(rollbackOn = java.lang.Exception.class)
public Utilisateur create(Utilisateur utilisateur) throws Exception {
final Utilisateur result = utilisateurRepository.save(utilisateur);
sendMessage.send("creation utilisateur : " + result.getId());
throw new Exception("rollback");
//return result;
}
}
Run Code Online (Sandbox Code Playgroud)
SendMessage 类女巫“管理”Jms 消息:
@Component
public class SendMessage {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${jms.queue.destination}")
private String destinationQueue;
public void send(String msg) {
this.jmsMessagingTemplate.convertAndSend(destinationQueue, msg);
}
}
Run Code Online (Sandbox Code Playgroud)
我的主要课程:
@SpringBootApplication …Run Code Online (Sandbox Code Playgroud) ActiveMQ 中的持久主题(这似乎是 JMS 本身的一个障碍)似乎只有一个使用者可以在订阅者上处于活动状态。
也就是说,在 ActiveMQ 文档中:
JMS 持久订阅者 MessageConsumer 是使用唯一的 JMS clientID 和持久订阅者名称创建的。要与 JMS 兼容,在任何时间点只能为一个 JMS clientID 激活一个 JMS 连接,并且对于一个 clientID 和订阅者名称只能激活一个使用者。即,只有一个线程可以从给定的逻辑主题订阅者中主动消费。
但是,其他排队系统(根据文档,Azure 服务总线似乎是这样做的)似乎很容易在单个“订阅”上允许多个线程“订阅者”。在这个时代,人们会认为这是理所当然的。
为什么会这样?这是否会在 JMS 和/或 ActiveMQ 的未来版本中得到解决?
PS,对于这种情况,“虚拟主题”(上面引用的文档)似乎不是一种理想且性能不佳的解决方法,因为它似乎在后台为每个订阅者创建了一个完全独立的队列。
这是我的配置:
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
String url = this.environment.getProperty("jms.broker.url");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(url);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
.....
Run Code Online (Sandbox Code Playgroud)
这是我的消费者:
@Service("msgConsumer")
public class MessageConsumer {
private static final String ORDER_RESPONSE_QUEUE = "thequeue.Q";
@JmsListener(destination = ORDER_RESPONSE_QUEUE, containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(final Message<String> message) throws Exception {
MessageHeaders headers = message.getHeaders();
LOG.info("Application : headers received : {}", headers);
String response = message.getPayload();
LOG.info("Application …Run Code Online (Sandbox Code Playgroud) 我在 Amazon MQ 上创建了一个代理,并在端口 61617 上获得了一个 SSL 端点。我也在寻找一个非 SSL 端点(就像我们可以在 Active MQ 上的 61616 上拥有 tcp)。Amazon MQ 是否仅提供 SSL?无论如何,我们也可以获得TCP端点吗?
activemq-classic ×10
jms ×6
java ×4
amazon-mq ×1
apache-tomee ×1
dead-letter ×1
mom ×1
mysql ×1
scheduler ×1
spring-batch ×1
spring-boot ×1
spring-jms ×1