我必须报告发布到RabbitMQ的消息数量以及RabbitMQ在使用1天内消耗的消息数量.通过使用RabbitMQ提供的REST API,我能够在某个时间间隔获取统计信息的快照. 如何使用相同的REST API 24小时获取数据.
我有一个场景,其中有一个应用程序正在生成不同类型的有趣事件(而不是命令).生产者应用程序不关心事件的处理对象和方式.
我正在实施一个消费者,他会倾听一些已发布的事件并对其进行适当的处理.消费者应用程序想要检查发布者应用程序交换是否存在.那么,问题是如何通过使用弹簧提供的兔子/ AMQP库检查是否存在具有特定名称的交换?
我想,这可以通过尝试将队列绑定到不存在的交换来间接处理,从而导致异常.我正在寻找更好的方法来处理这种情况.
通过RabbitMQ发送大小约为1Mb的文件是一个好主意吗?我想以与文件对应的二进制字段的json格式发送消息。
以及如何使用spring-amqp正确地做到这一点?仅通过发布下一类的对象?
class Message {
String field1;
byte[] fileField1;
byte[] fileField2;
}
Run Code Online (Sandbox Code Playgroud) 我正在使用带有Spring的RabbitMQ构建一个应用程序:到目前为止一直很好.定义单元测试我使用JUnit定位外部服务器.我想知道的是,是否有办法模拟RabbitMQ服务器来执行测试,如果有一种方法,那就是最好的方法.
我发现了一些帖子,但它们是在2012年甚至之前制作的......也许有更新,更容易,更有效的东西!
提前致谢
我正在尝试从XML Spring amqp配置迁移到基于java的注释,因为它"更简单".不确定我做错了什么XML配置工作正常但java @Configurable抛出"引起:java.net.SocketException:连接重置"异常.
XML配置(完美运行):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- define which properties files will be used -->
<context:property-placeholder location="classpath:*.properties" />
<rabbit:connection-factory id="connectionFactory"
addresses='${rabbitmq.hostname}'
username='${rabbitmq.username}'
password='${rabbitmq.password}'
virtual-host='${rabbitmq.virtual_host}'
cache-mode='${rabbitmq.cache_mode}'
channel-cache-size='${rabbitmq.channel_cache_size}'/>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="3"/>
<property name="maxPoolSize" value="5"/>
<property name="queueCapacity" value="15"/>
</bean>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="${rabbitmq.queue_name}" />
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}">
<rabbit:bindings>
<rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/>
<rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor"> …
Run Code Online (Sandbox Code Playgroud) 我的问题实际上是一个后续问题
在那里它声明包装"你的监听器"并传入CountDownLatch并最终所有线程将合并.如果我们手动创建和注入消息监听器但是对于@RabbitListener注释,这个答案是有效的...我不知道如何传入CountDownLatch.该框架在幕后自动神奇地创建了消息监听器.
还有其他方法吗?
我正在使用带有RabbitMQ活页夹的Spring Cloud Stream.它适用于byte[]
有效负载和Java本机序列化,但我需要使用JSON有效负载.
这是我的处理器类.
@EnableBinding(Processor.class)
public class MessageProcessor {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public OutputDto handleIncomingMessage(InputDto inputDto) {
// Run some job.
return new OutputDto();
}
}
Run Code Online (Sandbox Code Playgroud)
InputDto
并且OutputDto
是杰克逊注释的POJO.
我已经阅读了这个文档片段:
RabbitMQ 自动连接/拓扑恢复
从 Spring AMQP 的第一个版本开始,该框架提供了自己的连接和通道在代理失败的情况下恢复。此外,如第 3.1.10 节“配置代理”中所述,RabbitAdmin 将在重新建立连接时重新声明任何基础设施 bean(队列等)。因此,它不依赖于 amqp-client 库现在提供的自动恢复。Spring AMQP 现在使用 4.0.x 版本的 amqp-client,它默认启用自动恢复。如果您愿意,Spring AMQP 仍然可以使用自己的恢复机制,在客户端禁用它(通过将底层 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 属性设置为 false)。但是,该框架与启用的自动恢复完全兼容。
我不确定我是否正确理解。在我的application.properties
我已经定义了端口和主机。在启动我的 spring-boot 应用程序期间,它成功建立了连接和所有必要的 bean 以与队列通信。
但是,如果在启动期间我的应用程序代理关闭并且它将在应用程序启动五分钟后启动怎么办?是否spring-rabbitmq
设法重新连接并定义所有 bean?
java spring-integration spring-rabbit spring-amqp spring-boot
我有一个Spring启动应用程序,使用spring-boot版本1.5.16.RELEASE
,它将以下依赖项添加到我的应用程序:
[INFO] | +- com.rabbitmq:http-client:jar:1.0.0.RELEASE:compile
[INFO] | +- org.springframework.amqp:spring-rabbit:jar:1.7.10.RELEASE:compile
[INFO] | | +- org.springframework.amqp:spring-amqp:jar:1.7.10.RELEASE:compile
[INFO] | | \- com.rabbitmq:amqp-client:jar:4.8.1:compile
[INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:1.5.16.RELEASE:compile
Run Code Online (Sandbox Code Playgroud)
该应用程序还使用rabbitmq服务器3.6.6
作为消息代理.
我很少注意到我在应用程序中收到以下错误:
org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:67)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1208)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1197)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:562)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:545)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:515)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:497)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1300(CachingConnectionFactory.java:102)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1213)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1443)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1419)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:713)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:707)
...
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:899)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:790)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550)
at …
Run Code Online (Sandbox Code Playgroud) 我尝试使用 TopicExchange 来屏蔽消息。
配置:
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue name="sample.queue"/>
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>
<rabbit:listener-container connection-factory="connectionFactory" />
Run Code Online (Sandbox Code Playgroud)
成分:
@Component
public class JmsComponent {
private final Logger log = LoggerFactory.getLogger(JmsComponent.class);
private final TopicExchange exchange = new TopicExchange("sample.exchange");
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
private String received;
public void send(String msg) {
rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
}
public void bindToKey(String keyMask) {
BindingBuilder.bind(queue).to(exchange).with(keyMask);
rabbitTemplate.setExchange(exchange.getName());
}
public void sendByKey(String …
Run Code Online (Sandbox Code Playgroud) spring-rabbit ×10
spring-amqp ×7
java ×4
rabbitmq ×4
spring ×3
spring-boot ×2
amqp ×1
amqp-client ×1
json ×1
junit ×1
spring-cloud ×1