我试图理解为什么我想在 RabbitMQ 中使用 Spring 云流。我看过 RabbitMQ Spring 教程 4 ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ),这基本上就是我想要做的。它创建了一个带有 2 个附加队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。
如果您查看教程,则整个过程非常简单,您创建所有部件,将它们绑定在一起,然后就可以开始了。
我想知道使用 Sing Cloud Stream 有什么好处,如果这就是它的用例。创建一个简单的交换很容易,甚至可以通过流直接定义目的地和组。所以我想为什么不更进一步并尝试使用流处理教程案例。
我已经看到 Stream 有一个BinderAwareChannelResolver似乎做同样的事情。但是我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖问题,但我似乎在这里从根本上误解了一些东西,我认为是这样的:
spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
Run Code Online (Sandbox Code Playgroud)
应该诀窍。
有没有人有一个源和接收器的最小示例,它基本上创建了一个直接交换,将 2 个队列绑定到它,并根据路由关键路由到这两个队列之一,如https://www.rabbitmq.com/tutorials /tutorial-four-spring-amqp.html?
编辑:
下面是一组最小的代码,它演示了如何执行我所要求的操作。我没有附上,build.gradle因为它很简单(但如果有人感兴趣,请告诉我)
application.properties: 设置生产者
spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Run Code Online (Sandbox Code Playgroud)
Sources.class: 设置制作人频道
public interface Sources {
String OUTPUT = "output";
@Output(Sources.OUTPUT)
MessageChannel output();
}
Run Code Online (Sandbox Code Playgroud)
StatusController.class:响应休息呼叫并使用特定的路由键发送消息
/**
* Status endpoint for the health-check service.
*/
@RestController
@EnableBinding(Sources.class) …Run Code Online (Sandbox Code Playgroud) 我目前正在编写一个与竹构建服务器交互的小库。测试是使用 pytest 完成的。我陷入了以下问题。我想测试一个运行直到满足某些状态的 while 循环。阅读 pytest 文档,我试图“模拟”/monkeypatch 状态,但它并没有真正起作用。我可能在这里做错了一些基本的错误:这是有问题的 while 循环:
# determine current status
running = self._is_a_build_running()
# turn on and off running powerplug while building
while running:
self.feedback.turn_off_success()
self.feedback.turn_on_running()
time.sleep(self.blinker_time)
self.feedback.turn_off_running()
self._update_builds_status()
running = self._is_a_build_running()
Run Code Online (Sandbox Code Playgroud)
所以我用 pytest 尝试的是为正面和负面创建一个固定装置,_is_a_build_running如下所示:
@pytest.fixture(scope='function')
def mock_is_a_build_running():
return False
Run Code Online (Sandbox Code Playgroud)
然后使用 ThreadPool 使用此测试方法(此处解释了如何从 python 中的线程获取返回值?),因为我还需要包含 while 循环的方法的结果。
def test_update_status_running(bamboopickups, monkeypatch,
mock_update_overall_data_positive,
mock_update_builds_status_positive,
mock_is_a_build_running):
monkeypatch.setattr('BambooPickup._update_overall_data', lambda x: mock_update_overall_data_positive)
monkeypatch.setattr('BambooPickup._update_builds_status', lambda x: mock_update_builds_status_positive)
pool = ThreadPool(processes=1)
async_result = pool.apply_async(bamboopickups.update_status())
monkeypatch.setattr('BambooPickup._update_overall_data', lambda x: mock_update_overall_data_positive)
monkeypatch.setattr('BambooPickup._is_a_build_running', …Run Code Online (Sandbox Code Playgroud) 这个问题是关于 Python Click库的。
我想单击以收集我的命令行参数。收集后,我想重用这些值。我不想要任何疯狂的回调链,只需使用返回值。默认情况下,单击禁用使用返回值并调用sys.exit()。
我想知道如何正确调用standalone_mode( http://click.pocoo.org/5/exceptions/#what-if-i-don-t-want-that ),以防我想使用装饰器样式。上面链接的文档仅显示了使用单击(手动)创建命令时的用法。甚至有可能吗?下面显示了一个最小的例子。它说明了点击sys.exit()返回后如何直接调用gatherarguments
import click
@click.command()
@click.option('--name', help='Enter Name')
@click.pass_context
def gatherarguments(ctx, name):
return ctx
def usectx(ctx):
print("Name is %s" % ctx.params.name)
if __name__ == '__main__':
ctx = gatherarguments()
print(ctx) # is never called
usectx(ctx) # is never called
Run Code Online (Sandbox Code Playgroud)
$ python test.py --name Your_Name
我希望这是无状态的,意味着没有任何click.group功能 - 我只想要结果,而我的应用程序不退出。