我们使用amqplib来发布/使用消息.我希望能够读取队列中的消息数(理想情况下都是已确认和未确认).这将允许我向管理员用户显示一个很好的状态图,并检测某个组件是否跟不上负载.
我在amqplib文档中找不到有关读取队列状态的任何信息.
有人能指出我正确的方向吗?
我正在使用RabbitMQ按主题将消息路由到感兴趣的订阅者.每个订阅者都有一个队列,我将队列绑定到他们感兴趣的主题.我想允许用户从他们的主题列表中删除一个项目.
在我的设置中,这将需要从该用户的队列中"解除绑定"绑定主题.
我正在使用pyamqplib,我没有看到通过通道对象做到这一点的方法.他们是从队列中删除以前绑定的路由密钥的方法吗?
我想向RabbitMQ服务器发送消息,然后等待回复消息(在"回复"队列上).当然,我不想永远等待处理这些消息的应用程序关闭 - 需要超时.这听起来像是一项非常基本的任务,但我找不到办法做到这一点.我现在用py-amqplib和RabbitMQ .NET客户端遇到了这个问题.
到目前为止,我已经得到了最好的解决方案是使用轮询basic_get与sleep在两者之间,但是这是很丑陋:
def _wait_for_message_with_timeout(channel, queue_name, timeout):
slept = 0
sleep_interval = 0.1
while slept < timeout:
reply = channel.basic_get(queue_name)
if reply is not None:
return reply
time.sleep(sleep_interval)
slept += sleep_interval
raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)
Run Code Online (Sandbox Code Playgroud)
当然还有更好的方法吗?
我正在使用git repo的最新主分支https://github.com/celery/librabbitmq并librabbitmq==2.0.0按照自述文件中的说明安装Python 3.6
您可以通过执行以下操作来克隆存储库:
$ git clone git://github.com/celery/librabbitmq.git
Run Code Online (Sandbox Code Playgroud)
然后通过执行以下操作安装它:
$ cd librabbitmq
$ make install # or make develop
Run Code Online (Sandbox Code Playgroud)
正常运行(在操作系统中安装了某些二进制文件进行c编译之后),但是当我随后执行一个小的a+b添加任务并对其进行调用时,add.delay(2,2)失败并出现以下错误。我抬头看,发现Celery 4使用json作为序列化器,所以很明显这不是因为pickle序列化
[2018-04-30 23:40:02,956:CRITICAL / MainProcess]不可恢复的错误:SystemError('返回了带有错误集的结果',)追溯(最近一次调用为最后):文件“ /Users/somghosh/.virtualenvs/ ctdb / lib / python3.6 / site-packages / kombu / messaging.py”第624行,在_receive_callback中,如果on_m则返回on_m(message),否则self.receive(已解码,消息)文件“ /Users/somghosh/.virtualenvs/ ctdb / lib / python3.6 / site-packages / celery / worker / consumer / consumer.py”,行570,在on_task_received回调中,文件“ /Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site- package / celery / worker / strategy.py”,第145行,位于task_message_handler句柄(要求)文件“ /Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/worker.py”中,第221行,在_process_task_sem中返回self._quick_acquire(self._process_task,req)在获取回调中的文件“ /Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/kombu/async/semaphore.py”,第62行(* partial_args,** partial_kwargs)文件“ /Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/worker.py”,第226行,位于_process_task req.execute_using_pool(自身。池)文件“ /Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/request.py”,行531,在execute_using_poolcorrelation_id …
我想创建一个能够OCR文本的进程场.我曾考虑使用由多个OCR进程读取的单个消息队列.
我想确保:
使用AMQP可以吗?
我打算用python和rabbitmq