Opt*_*mus 10 python rabbitmq pika
我想检查消费者/工人是否在场消费我即将发送的消息.
如果没有任何工人,我会启动一些工作人员(消费者和发布者都在一台机器上),然后继续发布消息.
如果有类似的函数connection.check_if_has_consumers,我会像这样实现它 -
import pika
import workers
# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
# start the workers in other processes, using python's `multiprocessing`
workers.start_workers()
# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
properties=pika.BasicProperties(delivery_mode=2))
connection.close()
Run Code Online (Sandbox Code Playgroud)
但是我无法check_if_has_consumers在pika中找到任何具有功能的功能.
使用鼠兔有没有办法完成这个?或者,通过直接与兔子交谈?
我不能完全肯定,但我真的认为RabbitMQ的是意识的消费者订阅不同的队列数量,因为它派遣的消息,对他们接受的ACK
我刚开始使用RabbitMQ 3小时前...欢迎任何帮助......
这是我写的workers.py代码,如果有任何帮助....
import multiprocessing
import pika
def start_workers(num=3):
"""start workers as non-daemon processes"""
for i in xrange(num):
process = WorkerProcess()
process.start()
class WorkerProcess(multiprocessing.Process):
"""
worker process that waits infinitly for task msgs and calls
the `callback` whenever it gets a msg
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_working = multiprocessing.Event()
def run(self):
"""
worker method, open a channel through a pika connection and
start consuming
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='worker_queue', auto_delete=False,
durable=True)
# don't give work to one worker guy until he's finished
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='worker_queue')
# do what `channel.start_consuming()` does but with stopping signal
while len(channel._consumers) and not self.stop_working.is_set():
channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
return 0
def signal_exit(self):
"""exit when finished with current loop"""
self.stop_working.set()
def exit(self):
"""exit worker, blocks until worker is finished and dead"""
self.signal_exit()
while self.is_alive(): # checking `is_alive()` on zombies kills them
time.sleep(1)
def kill(self):
"""kill now! should not use this, might create problems"""
self.terminate()
self.join()
def callback(channel, method, properties, body):
"""pika basic consume callback"""
print 'GOT:', body
# do some heavy lifting here
result = save_to_database(body)
print 'DONE:', result
channel.basic_ack(delivery_tag=method.delivery_tag)
Run Code Online (Sandbox Code Playgroud)
编辑:
我必须向前迈进,所以这是一个我将采取的解决方法,除非出现更好的方法,
所以,RabbitMQ有这些HTTP管理apis,它们在你打开管理插件后工作,在HTTP apis页面中间有
/ api/connections - 所有打开连接的列表.
/ api/connections/name - 单个连接.删除它将关闭连接.
因此,如果我通过不同的连接名称/用户连接我的Workers和我的Produces,我将能够检查工作者连接是否打开...(当工人死亡时可能会出现问题...)
将等待更好的解决方案......
编辑:
刚刚在rabbitmq文档中找到了这个,但是在python中这样做很麻烦:
shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue 0
...done.
Run Code Online (Sandbox Code Playgroud)
所以我可以做点什么,
subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
Run Code Online (Sandbox Code Playgroud)
hacky ...仍然希望pika有一些python功能来做到这一点......
谢谢,
小智 7
我也只是在研究这个问题.阅读完源代码和文档后,我在channel.py中遇到了以下内容:
@property
def consumer_tags(self):
"""Property method that returns a list of currently active consumers
:rtype: list
"""
return self._consumers.keys()
Run Code Online (Sandbox Code Playgroud)
我自己的测试成功了.我使用了以下我的频道对象是self._channel:
if len(self._channel.consumer_tags) == 0:
LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.")
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7977 次 |
| 最近记录: |