在后台运行pika ioloop或使用自定义ioloop

dgo*_*sen 4 python multithreading amqp rabbitmq

我觉得这应该不是那么困难,但到目前为止我没什么成功.

假设我有一个名为PikaClass的类,它包裹鼠兔并提供一些商业方法.

def PikaClass(object):
  def __init__(self):
     # connect to the broker
     self.connection = pika.SelectConnection(<connection parameters>, self.on_connect)
     # ..other init stuff..

  def on_connect(self, connection):
     # called when the connection has been established 
     # ..open a channel, declare some queues, etc.

  def start(self):
     # start the polling loop 
     self.connection.ioloop.start()

  def foo(self, **kwargs):
     # do some business logic, e.g., send messages to particular queues
Run Code Online (Sandbox Code Playgroud)

直观地说,这就是我想要实现的:用户创建一个实例PikaClass,在后台设置循环,然后通过调用某些业务方法与对象进行交互

p = PikaClass()
p.start()
bar = p.foo(..)
Run Code Online (Sandbox Code Playgroud)

问题是p.start()阻止并阻止主代码在调用start()后与对象交互.我的第一个想法是将调用包装在一个线程中:

Thread(target=p.start()).start()
bar = p.foo(..)
Run Code Online (Sandbox Code Playgroud)

但那仍然会阻止你永远不会到达p.foo(..).文档提到你不应该在线程之间共享连接,这样可能会在某处导致问题.

我也尝试使用AsyncoreConnection而不是SelectConnection,并直接调用_connect()(而不是使用ioloop),但这没有任何效果(没有任何反应).

那么如何在后台运行ioloop,或者至少运行我自己的ioloop?

注意:这是win64(xp)上的Python 2.6,最新的pika 0.9.4

die*_*sys 8

您正在调用'p.start'而不是将其作为参数传递.代码应该是:

Thread(target=p.start).start()
Run Code Online (Sandbox Code Playgroud)

线程将在执行Thread.start时调用p.start.

我不确定这是否能解决您的问题,但它可以帮助您达成解决方案.


Bra*_*des 6

GIL在这里不是问题,因为ioloop几乎所有的时间都花在select(2)系统调用上,在此期间GIL被释放,其他Python线程可以运行并执行其他任务.

最简单的方法是为每个请求设置和拆除队列连接.您可能认为这会太昂贵 - 因为它需要重新验证并且(可能)重复与每个连接的SSL协商 - 但它应该是最简单,最健壮和最容易编写的,除非您知道这一点,否则应该是您的控制因素设置和拆卸实际上会损害整个应用程序的性能(最好通过测试来衡量).

另一种方法是只start()ioloop你曾经有过一个短信发送,并有接收回应停止的方法ioloop,使你的程序重新获得控制.您可以ioloop提前返回:

    connection.ioloop.poller.open = False
Run Code Online (Sandbox Code Playgroud)

然后记得Truestart()再次呼叫等待另一个回复之前将其重新设置.