Rabbitmq-是否在同一频道上消费和发布?

cko*_*kot 5 python rabbitmq python-2.7 pika

我有两个通过pika连接到Rabbitmq的python进程。每个主题消耗一组主题,另一个主题作为响应发布。一种使用SelectConnection,另一种使用TornadoConnection。

目前,这两个都是测试程序,它们模拟用户与服务器之间的对话,并且每个程序的on_message()都经过简单的硬编码,以分支到接收到的routing_key上,并发布相应的响应。

最初,经过一段随机的时间(通常不超过2分钟),我会收到类似以下的错误消息:

UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
Run Code Online (Sandbox Code Playgroud)

在搜索了有关堆栈溢出和其他地方的大量文章之后,我逐渐了解到此错误与竞态条件有关,竞态条件是在basic_publish完成之前消耗了一些东西。

我对代码进行了更改,因此,我不进行立即的basic_publish()而是将回调传递给connection.add_timeout(),延迟为1秒。进行此更改后,我可以进行大量运行,其中两个进程之间相互进行“对话”的时间大于1小时,而不会重现错误。

我的问题是,这仅仅是因为我在模拟一个用户而起作用?我需要两个消费和发布渠道吗?

def on_message(self, unused_channel, basic_deliver, properties, body):
    if self._sibling_app_id == properties.app_id:
        self.dispatch_message(basic_deliver, properties, body)


def dispatch_message(self, basic_deliver, properties, body):
    (user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)

    if "login-response" == msg_type:
        print body
    elif "gid-assignment" == msg_type:
        print body
    elif "tutor-logout" == msg_type:
        print body
    elif "tutor-turn" == msg_type:
        message = "i don't know"
        routing_key = "%s.input" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    elif "nlu" == msg_type:
        message = "dnk"
        routing_key = "%s.nlu-response" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    else:
        print "invalid message-type: %s" % msg_type
        print body

def delayed_publish_message(self, routing_key, message):
    """returns a callback which can be passed to schedule_next_message()"""
    def delayed_publish_cb():
        self.publish_message(routing_key, message)
    return delayed_publish_cb


def schedule_next_message(self, cb, publish_interval=None):
    if self._stopping:
        return
    if publish_interval is None:
        publish_interval = self.PUBLISH_INTERVAL
    if -1 == publish_interval:
        return
    self._connection.add_timeout(publish_interval, cb)


def publish_message(self, routing_key, message):
    if self._stopping:
        return
    properties = pika.BasicProperties(app_id=self._app_id,
                                                          content_type='text/plain')
    self._channel.basic_publish(self.EXCHANGE, routing_key,
                                                 message, properties)
Run Code Online (Sandbox Code Playgroud)

its*_*ire 6

通道将被单向使用。该AMQP协议规范约为说的很清楚:

AMQP会话将两个单向通道相关联,以在两个容器之间形成双向的顺序对话。单个连接可能同时具有多个活动的独立会话,直至协商的通道限制。每个对等方都将“连接”和“会话”都建模为端点,这些端点存储与所讨论的“连接”或“会话”有关的本地和上次已知的远程状态。

因此,您应该为应用程序使用输入和输出通道。

  • 您引用的是 AMQP 1.0 规范,但 RabbitMQ 实现了版本 0.9.1。0.9.1规范中有类似的条款吗? (2认同)

cko*_*kot 1

我完成了我的承诺,正准备去睡觉,突然我想通了。我发现rabbitmq.com上的python教程仍然说要安装pika:

 sudo pip install pika==0.9.8
Run Code Online (Sandbox Code Playgroud)

虽然 0.9.8 是在 2012 年的某个时候发布的,但我认为该修复是在该版本发布后的某个时候添加的。0.9.9 于 2013 年某个时候发布

所以我做了:

sudo pip uninstall pika
Run Code Online (Sandbox Code Playgroud)

接下来是 pika 网站上的安装说明:

sudo pip install pika
Run Code Online (Sandbox Code Playgroud)

然后我将所有的connection.add_timeout(1,delayed_publish_cb)替换为basic_publish(),祈祷,运行它,我的两个进程在不到5分钟的时间内相互交换了大约200,000条消息,没有任何问题

很高兴知道 2012 年的错误修复仍然有效。

我必须让rabbitmq 的人知道更新他们的教程。