事件可以管理AMQP连接与异步传入和传出的消息吗?

Bra*_*des 8 python asynchronous amqp eventlet

实际设计:

对于那些回到这个问题的人,下面的有用答案将我推向了一个运行良好的可行设计.三个见解是关键:

  1. Eventlet是一个非常安全的环境 - 如果两个greenlet同时尝试recv()或同时send()从同一个套接字尝试,那么Eventlet会优雅地杀死第二个greenlet.这非常棒,意味着如果amqplib"绿色"不佳,将导致简单的异常,而不是不可能重现数据交错错误.
  2. 这些amqplib方法大致分为两组:wait()内部循环recv()直到组装AMQP消息,而其他方法send()消息返回并且不会尝试自己的recv().鉴于amqplib作者不知道有人试图"绿化"他们的图书馆,这是非常好的运气!这意味着消息发送不仅可以安全地从调用的回调中获取wait(),而且消息也可以安全地从完全不在wait()循环控制之外的其他greenlet发送.这些安全的方法 - 可以从任何greenlet调用,而不仅仅是从wait()回调调用 - 是:
    1. basic_ack
    2. basic_consumenowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declarenowait=True
    7. exchange_deletenowait=True
    8. queue_bindnowait=True
    9. queue_unbindnowait=True
    10. queue_declarenowait=True
    11. queue_deletenowait=True
    12. queue_purgenowait=True
  3. 信号量可以用作锁:初始化与计数信号量1,然后acquire()release()锁定和解锁.我想要写消息的所有异步greenlet都可以使用这样的锁来避免它们的单独send()调用交错并破坏AMQP协议.

所以我的代码大致如下:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8')

class Processor(object):
    def __init__(self):
        write_lock = eventlet.semaphore.Semaphore(1)

    def listening_greenlet(channel):
        # start this using eventlet.spawn_n()
        # create Connection and self.channel
        self.channel.basic_consume(queue, callback=self.consume)
        while True:
            self.channel.wait()

    def safe_publish(channel, *args, **kw):
        with write_lock:  # yes, Eventlet supports this!
            channel.basic_publish(*args, **kw)     

    def consume(message):
        # Returning immediately frees the wait() loop
        eventlet.spawn_n(self.process, message)

    def process(message):
        # do whatever I want
        # whenever I am done, I can async reply:
        self.safe_publish(...)
Run Code Online (Sandbox Code Playgroud)

请享用!

原始问题:

想象一下,每分钟都有数百条AMQP消息到达一个小的Python Eventlet应用程序,每个消息都需要处理和回答 - 处理的CPU开销很小,但可能需要等待其他服务和套接字的答案.

例如,为了允许一次处理100条消息,我当然可以向RabbitMQ提供100个单独的TCP连接,并为每个连接提供一个工作人员,以锁定步骤接收,处理和应答单个消息.但为了节省TCP连接,我宁愿只创建一个AMQP连接,允许RabbitMQ以全速的速度向我发送消息,将这些任务交给工作人员,并在每个工作人员完成时发回答案:

                                       +--------+
                                +------| worker | <-+
                                |      +--------+   |
                                |      +--------+   |
                                | +----| worker | <-+
                                | |    +--------+   |
                                | |    +--------+   |
                                | | +--| worker | <-+
                                | | |  +--------+   |
                                v v v               |
                           +------------+           |
 RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
                           +------------+
Run Code Online (Sandbox Code Playgroud)

观察:

  • 一个Eventlet队列可以优雅地在工作者之间分配传入的工作,因为它们可用于更多工作.
  • RabbitMQ的流量控制甚至可能是这样的:我只能在我的工作人员忙碌之前确认消息,然后在发送更多ACK之前等待,直到队列开始为空.
  • 工作几乎肯定是无序完成的:一个请求可能会很快完成,而另一个事先得到的事件需要更长的时间; 有些要求可能永远不会完成; 所以工人们将以不可预测的异步顺序回复回复.

在看到这篇有吸引力的博客文章后,我一直计划使用Eventlet和py-amqplib来编写这篇文章,该文章讲述了如何轻松地将AMQP库引入到Eventlet处理模型中:

http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

我的问题是,在阅读了两个库的文档,amqplib源代码以及大部分的Eventlet源代码后,我无法弄清楚如何教授拥有AMQP连接的eventlet - connect_to_host()在博客文章中命名的eventlet -到醒了,当工人完成其工作,并生成一个答案.wait()amqplib中的方法只能通过AMQP套接字上的活动唤醒.虽然感觉我应该是能够有工人写他们的答案到队列,并有connect_to_host()eventlet唤醒无论是在新进入的邮件到达当工人准备好一个答案发送,我找不到任何方式对于eventlet说"叫醒我时,无论这些事情发生."

我确实发现,工作人员可以尝试使用AMQP连接对象 - 甚至是原始套接字 - 并通过TCP写回自己的消息; 但似乎有必要使用锁来防止传出的工作者消息相互交错或者与主侦听器eventlet写入的ACK消息相互交错,我也无法找到Eventlet中可用的锁.

所有这一切让我几乎可以肯定我正试图以某种方式完全倒退来解决这个问题.像这样的问题 - 让一个连接在监听器调度程序和许多工作者之间安全地共享 - 根本不映射到协程模型,并且需要一个完整的异步库?(在这种情况下:是否有一个你会推荐这个问题,以及如何在传入的消息和传出的工作者响应之间进行多路复用?我今天早些时候发现没有干净的解决方案尝试像Pika + ioloop这样的组合 - 尽管我刚看到另一个库,stormed_amqp,它可能比Pika做得更好.)或者,如果我想要可以实现此模型的干净且可维护的代码,我是否真的需要依靠实际的Python线程?我对所有选择持开放态度.

感谢您的帮助或想法!我一直在想我整个Python中的并发性很差,然后我再次学习我没有.:)我希望你无论如何都喜欢上面的ASCII艺术.

Ste*_*han 5

在阅读你的帖子并使用gevent作为eventlet之类的一个类似的库后,我发现了一些事情,因为我刚刚解决了类似的问题

一般来说,没有必要锁定,因为有过只有一个eventlet或greenlet在同一时间没有长的他们是阻止一切似乎都在同一时间运行运行..但YOUT不要想发送数据下来插座,而另一个greenlet发送给.你是对的,确实需要锁定.

如果我有这样的问题,那么查看文档是不够的..去看看源代码!它的开源无论如何你学到了更多关注其他人的代码.

这里有一些简化的示例代码可能会为您解决问题.

在你的调度员有2个队列

self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server
Run Code Online (Sandbox Code Playgroud)

让工作人员将他们的结果放在服务器队列上.

发送和接收代码

def send_into_ampq():
    while True:
       message = dispatcher.get_workger_msg()

       try:
          connection.send(self.encode(message))
       except:
           connection.kill()

def read_from_ampq():
    while True:
        message = connection.wait()

        dispatcher.put_ampq_msg(self.decode(message))
Run Code Online (Sandbox Code Playgroud)

在连接代码的发送功能中

self._writelock = Semaphore(1) 
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until 
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)

def send(self, message):
    """
    you need a write lock to prevent more greenlets
    sending more messages when previous sent is not done yet.
    """

    with self._writelock:
        self.socket.sendall(message)
Run Code Online (Sandbox Code Playgroud)