如何在 python 中使用 RabbitMQ 进行简单的单元测试?

dar*_*izp 5 rabbitmq pika

在我的单元测试中,我想简单地开始使用、发布消息、接收返回的响应并断言响应是否符合我的预期。然而,我已经尝试这样做几个小时了,但没有找到解决方案。

问题是我无法在类中定义停止消费的方法。我尝试过定义这样的方法:

def stop(self):
    self.channel.basic_cancel()
Run Code Online (Sandbox Code Playgroud)
def stop(self):
    self.channel.stop_consuming()
Run Code Online (Sandbox Code Playgroud)
def stop(self):
    self.connection.close()
Run Code Online (Sandbox Code Playgroud)

但似乎没有任何作用。我读到这是因为一旦执行start_consuming(),停止消费的唯一方法就是在发送消息后取消它。但如果我这样做,那么我将修改原始文件on_request,这对我的应用程序没有用,因为连接将在第一条消息后关闭。我找到了pytest-rabbitmq但文档对我来说不清楚,因此不知道我是否可以使用这个插件来实现我想要的。

basic_cancel顺便问一下,和stop_consuming之间有什么区别close

bum*_*bee 2

我不太清楚你的场景!根据我的理解,您可以用相同的方法创建连接和通道,以便您可以在需要时发布、消费、断言和停止消费

希望这可以帮助!

def test_rabbitmq():
    from pika import BlockingConnection, ConnectionParameters, PlainCredentials

    conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
    channel = conn.channel()

    # define your consumer
    def on_message(channel, method_frame, header_frame, body):
        message = body.decode()
        # assert your message here
        # asset message == 'value'
        channel.basic_cancel('test-consumer')  # stops the consumer

    # define your publisher
    def publish_message(message):
        channel.basic_publish(exchange='', routing_key='', body=message')

    publish('your message')
    tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')
Run Code Online (Sandbox Code Playgroud)

stop_consuming- 取消所有消费者,指示 start_consuming 循环退出。

basic_cancel- 此方法取消消费者。消费者标签将被作为输入。

close- 关闭连接/通道

参考