我正在尝试使用rabbitmq 获得一个基本的队列系统,但是当我尝试使用线程时,它似乎只运行了 1 个线程。
我的代码:
import pika
import threading
rabbit_url = "amqp://user:pass!@127.0.0.1:5672/%2f"
def start(max_threads):
for i in xrange(max_threads):
t = threading.Thread(target=run)
t.start()
t.join()
def run():
connection = pika.BlockingConnection(pika.URLParameters(rabbit_url))
channel = connection.channel()
channel.basic_consume(callback,
queue='docketq',
no_ack=True)
channel.start_consuming()
def callback(ch, method, properties, body):
do_work(body)
def do_work(body):
print body
Run Code Online (Sandbox Code Playgroud) 我知道之前有人问过这个问题,但我找不到任何合适的解决方案来解决我的问题,因此再次发布。
我正在尝试使用 spring-amqp 编写一个 rabbitmq 生产者,以在直接队列上发布我的消息。由于它是生产者应用程序,因此必须创建我的交换和队列。此外,我没有定义任何侦听器 bean,因为我在生产者端不需要它。下面是我的rabbitmq-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Spring AMQP Admin -->
<rabbit:admin id="amqpAdmin"
connection-factory="rabbitmqConnectionFactory"
auto-startup="true" />
<!-- Spring AMQP Template -->
<rabbit:template id="amqpTemplate"
connection-factory="rabbitmqConnectionFactory"
routing-key="my.key" exchange="my.exchange"
channel-transacted="true"/>
<!-- Connection Factory -->
<rabbit:connection-factory id="rabbitmqConnectionFactory"
host="${rabbitmq.host:localhost}"
port="${rabbitmq.port:5672}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost:/}"
cache-mode="CONNECTION"
channel-cache-size="${rabbitmq.channel-cache-size:25}" />
<!-- Queue and Exchange -->
<rabbit:queue id="my.queue" durable="false"
auto-declare="true" auto-delete="true" />
<rabbit:direct-exchange name="my.exchange" durable="false"
auto-declare="true" auto-delete="true" declared-by="amqpAdmin">
<rabbit:bindings>
<rabbit:binding queue="my.queue" key="my.key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
Run Code Online (Sandbox Code Playgroud)
我只有一个简单的生产者类,它使用上面的 …
我成功设置了一个 Topic Exchange,并且能够一次向多个消费者发送消息。
我还想向竞争的消费者传递消息并继续使用主题交换。我读到使用相同的队列名称可以让消费者竞争消息。但是,我可能会弄错,因为我无法让它发挥作用。
为同一主题的多个侦听器设置:
如何将竞争消费者设置为同一主题?
主题交换甚至可能吗?
谢谢。
我在 PHP 脚本中有一个消费者工作者。
但是有时候RabbitMQ服务器会停止运行,
我收到此错误:
PHP Fatal error: Uncaught exception 'ErrorException' with message 'fwrite(): send of 19 bytes failed with errno=32 Broken pipe' in /home/user/pusher/rabbitmq-worker/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:281
Run Code Online (Sandbox Code Playgroud)
我希望能够使用 try/catch 块来处理这个错误,以向我的控制台抛出一个很好的错误,或者在一段时间后尝试重新连接。
这是我到目前为止的代码:
while (true) {
try {
$connection = new AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USER, RABBITMQ_PASS);
$channel = $connection->channel();
$channel->queue_declare(RABBITMQ_DT_QUEUE, false, true, false, false);
$channel->basic_qos(null, 11, null);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($req) {
sleep(3);//Some task that takes 3 sec
};
$channel->basic_consume(RABBITMQ_QUEUE, '', false, false, false, false, …
Run Code Online (Sandbox Code Playgroud) 我使用 spring AMQP 创建了一个消息侦听器,我用它来接收订单 POJO。消息类型是 application/json,所以我设置了一个 jackson 消息转换器。到目前为止,一切正常,我能够在我的侦听器中自动重新创建订单 POJO 对象。但是我想扩展这个例子并想检查我的监听器中的一些消息属性。因此,我想使用“org.springframework.amqp.core.Message”作为参数,而不是在我的 handleMessage() 中使用我的 Order POJO。然后我可以稍后转换正文,但通过这种方式,我将在我的侦听器中拥有所有与消息相关的属性,我可以在我的应用程序中使用这些属性。
我尝试将 handleMessage() 与 Message 参数一起使用,但似乎它也尝试使用 jackson 转换器来转换消息正文。我不确定在哪里传递 Order POJO 类 jackson 可以用来转换我的消息正文,但仍然应该能够正确转换消息。
请在下面找到我的代码中的重要片段。请帮助我,因为我认为我在这方面遇到了障碍。
POJO
public class Order {
private int orderid;
private String itemDescription;
Run Code Online (Sandbox Code Playgroud)
模板和转换器设置
@Bean
public RabbitTemplate rubeExchangeTemplate() {
logger.info("Lets test autowiring " + rabbitConnectionFactory.getHost());
RabbitTemplate r = new RabbitTemplate(this.rabbitConnectionFactory);
r.setExchange("rmq-exchange");
r.setMessageConverter(jsonMessageConverter());
return r;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper());
return converter;
}
@Bean
public DefaultClassMapper classMapper()
{ …
Run Code Online (Sandbox Code Playgroud) 我有一个 RabbitMQ 队列,它最初是这样声明的:
var result = _channel.QueueDeclare("NewQueue", true, false, false, null);
Run Code Online (Sandbox Code Playgroud)
我正在尝试添加死信交换,所以我将代码更改为:
_channel.ExchangeDeclare("dl.exchange", "direct");
Dictionary<string, object> args = new Dictionary<string, object>()
{
{ "x-dead-letter-exchange", "dl.exchange" }
};
var result = _channel.QueueDeclare("NewQueue", true, false, false, args);
Run Code Online (Sandbox Code Playgroud)
当我运行这个时,我收到错误:
抛出异常:RabbitMQ.Client.dll 中的“RabbitMQ.Client.Exceptions.OperationInterruptedException”
附加信息:AMQP 操作被中断:AMQP close-reason,由 Peer 发起,code=406,text="PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'NewQueue' in vhost'/': received 'longstr' 类型的值 'dl.exchange' 但当前为 none", classId=50, methodId=10, cause=
该错误似乎不言自明,如果我删除队列,当我重新创建它时,我不会收到错误消息,但我的问题是:有没有办法在不删除队列的情况下进行此更改?
我使用 Rabbit MQ 发送和接收 JSON 消息。
我已经使用 RabbitTemplate 实例和 convertAndSend 方法实现了一个应用程序来向交换发送消息,如下所示:
rabbitTemplate.convertAndSend(exchangeNameOut, message.getString(PERSISTENCE_MESSAGE_ROUTING_KEY),
message.getString(PERSISTENCE_MESSAGE_BODY), new CorrelationData(""+analyticRuntime.getId()));
Run Code Online (Sandbox Code Playgroud)
在检索消息时,侧接收器,我实现了以下方法:
@RabbitListener(queues = "${rabbit.queue.mail.name}",containerFactory = "rabbitListenerContainerFactory")
public void processMailMessage(Message message) {
log.info("ENTER [processMailMessage] ");
Mail mail;
JSONObject messageBody = new JSONObject(new String(message.getBody()));
String asset = "" + messageBody.get(Constants.PERSISTENCE_MESSAGE_ASSET_ID_KEY);
String body = "" + messageBody.get(Constants.PERSISTENCE_MESSAGE_EVENT_KEY);
String alarms = "";
log.info("[processMailMessage] message.getMessageProperties().getCorrelationId() : " + message.getMessageProperties().getCorrelationId()";
}
Run Code Online (Sandbox Code Playgroud)
问题是:
谢谢
我正在运行 64 位 Windows 10,我安装了 Erlang 64 位 R16B03,设置了 ERLANG_HOME 系统环境变量(不是用户环境变量)并验证它可以从命令外壳正常工作。当我运行 RabbitMQ3.6.6.exe 进行设置时,出现错误“无法检测到 Erlang”。RabbitMQ 3.6.6 需要什么版本的 Erlang ??Erlang 必须安装在 C:\Program Files 中吗??我将它安装在 C:\Erlang 文件夹中。
我还尝试将 %ERLANG_HOME%\bin 添加到 Path ,但没有任何区别。我已经安装了 VC++ 可再发行 exe(2005、2008、2013、2015)。我尝试了 Erlang 的 OTP 19.2 (erl8.2) 版本,结果相同。有任何想法吗?
我试图避免从 RabbitMQ zip 文件手动安装,因为我是 RabbitMQ 的新手,只是想启动并运行它以便我可以学习它。
谢谢
我花了一整天的时间来尝试启动 spring-amqp 示例项目并针对 docker 版本的 rabbitmq 运行。我只是在运行标准的rabbitmq docker。虽然连接没有问题,但我总是遇到与创建队列相关的异常,此时我已经尝试了所有可能的变体。
我试图像示例项目一样在我的配置中声明队列。我试过显式配置 RabbitAdmin。我试过显式配置整个自动配置的混乱。我在 rabbitmq 中创建了一个新用户并明确分配了权限。我试过只使用队列名称而不实际声明队列 bean。我尝试将队列和 RabbitAdmin 注入其他 bean,只是为了强制创建它们。
无论我尝试什么,我都会收到以下错误:
2017-04-24 17:42:19.709 WARN 37360 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue:"incoming"
2017-04-24 17:42:19.715 WARN 37360 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):["incoming"]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:621) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:520) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1382) [spring-rabbit-1.7.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.2.jar:4.0.2]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.2.jar:4.0.2]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.2.jar:4.0.2]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992) ~[amqp-client-4.0.2.jar:4.0.2]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50) ~[amqp-client-4.0.2.jar:4.0.2]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native …
Run Code Online (Sandbox Code Playgroud) 我有一个场景,客户端通过 STOMP 将消息发布到 RabbitMQ/queue
或/topic
目标。
另一方面,我有一个通过 AMQP 连接到 RabbitMQ 的服务器进程。是否可以从 AMQP 访问“STOMP 通道”,或者它们是否被隔离到自己的命名空间中?
看起来 AMQP 通道可以通过 STOMP 访问,/amq/queue/
但我没有找到另一个方向的案例。