在我的单元测试中,我想简单地开始使用、发布消息、接收返回的响应并断言响应是否符合我的预期。然而,我已经尝试这样做几个小时了,但没有找到解决方案。
问题是我无法在类中定义停止消费的方法。我尝试过定义这样的方法:
def stop(self):
self.channel.basic_cancel()
Run Code Online (Sandbox Code Playgroud)
def stop(self):
self.channel.stop_consuming()
Run Code Online (Sandbox Code Playgroud)
def stop(self):
self.connection.close()
Run Code Online (Sandbox Code Playgroud)
但似乎没有任何作用。我读到这是因为一旦执行start_consuming(),停止消费的唯一方法就是在发送消息后取消它。但如果我这样做,那么我将修改原始文件on_request,这对我的应用程序没有用,因为连接将在第一条消息后关闭。我找到了pytest-rabbitmq但文档对我来说不清楚,因此不知道我是否可以使用这个插件来实现我想要的。
basic_cancel顺便问一下,和stop_consuming之间有什么区别close?
我不太清楚你的场景!根据我的理解,您可以用相同的方法创建连接和通道,以便您可以在需要时发布、消费、断言和停止消费
希望这可以帮助!
def test_rabbitmq():
from pika import BlockingConnection, ConnectionParameters, PlainCredentials
conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
channel = conn.channel()
# define your consumer
def on_message(channel, method_frame, header_frame, body):
message = body.decode()
# assert your message here
# asset message == 'value'
channel.basic_cancel('test-consumer') # stops the consumer
# define your publisher
def publish_message(message):
channel.basic_publish(exchange='', routing_key='', body=message')
publish('your message')
tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')
Run Code Online (Sandbox Code Playgroud)
stop_consuming- 取消所有消费者,指示 start_consuming 循环退出。
basic_cancel- 此方法取消消费者。消费者标签将被作为输入。
close- 关闭连接/通道
| 归档时间: |
|
| 查看次数: |
7283 次 |
| 最近记录: |