小编mai*_*nsi的帖子

Spring Cloud Stream RabbitMQ

我试图理解为什么我想在 RabbitMQ 中使用 Spring 云流。我看过 RabbitMQ Spring 教程 4 ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ),这基本上就是我想要做的。它创建了一个带有 2 个附加队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。

如果您查看教程,则整个过程非常简单,您创建所有部件,将它们绑定在一起,然后就可以开始了。

我想知道使用 Sing Cloud Stream 有什么好处,如果这就是它的用例。创建一个简单的交换很容易,甚至可以通过流直接定义目的地和组。所以我想为什么不更进一步并尝试使用流处理教程案例。

我已经看到 Stream 有一个BinderAwareChannelResolver似乎做同样的事情。但是我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖问题,但我似乎在这里从根本上误解了一些东西,我认为是这样的:

spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
Run Code Online (Sandbox Code Playgroud)

应该诀窍。

有没有人有一个源和接收器的最小示例,它基本上创建了一个直接交换,将 2 个队列绑定到它,并根据路由关键路由到这两个队列之一,如https://www.rabbitmq.com/tutorials /tutorial-four-spring-amqp.html

编辑

下面是一组最小的代码,它演示了如何执行我所要求的操作。我没有附上,build.gradle因为它很简单(但如果有人感兴趣,请告诉我)

application.properties: 设置生产者

spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Run Code Online (Sandbox Code Playgroud)

Sources.class: 设置制作人频道

public interface Sources {

    String OUTPUT = "output";

    @Output(Sources.OUTPUT)
    MessageChannel output();
}
Run Code Online (Sandbox Code Playgroud)

StatusController.class:响应休息呼叫并使用特定的路由键发送消息

/**
 * Status endpoint for the health-check service.
 */
@RestController
@EnableBinding(Sources.class) …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot spring-cloud-stream

5
推荐指数
1
解决办法
2600
查看次数

如何使用pytest测试无限while循环

我目前正在编写一个与竹构建服务器交互的小库。测试是使用 pytest 完成的。我陷入了以下问题。我想测试一个运行直到满足某些状态的 while 循环。阅读 pytest 文档,我试图“模拟”/monkeypatch 状态,但它并没有真正起作用。我可能在这里做错了一些基本的错误:这是有问题的 while 循环:

    # determine current status
    running = self._is_a_build_running()

    # turn on and off running powerplug while building
    while running:
        self.feedback.turn_off_success()
        self.feedback.turn_on_running()
        time.sleep(self.blinker_time)
        self.feedback.turn_off_running()
        self._update_builds_status()
        running = self._is_a_build_running()
Run Code Online (Sandbox Code Playgroud)

所以我用 pytest 尝试的是为正面和负面创建一个固定装置,_is_a_build_running如下所示:

@pytest.fixture(scope='function')
def mock_is_a_build_running():
    return False
Run Code Online (Sandbox Code Playgroud)

然后使用 ThreadPool 使用此测试方法(此处解释了如何从 python 中的线程获取返回值?),因为我还需要包含 while 循环的方法的结果。

def test_update_status_running(bamboopickups, monkeypatch,
                   mock_update_overall_data_positive,
                   mock_update_builds_status_positive,
                   mock_is_a_build_running):
monkeypatch.setattr('BambooPickup._update_overall_data', lambda x: mock_update_overall_data_positive)
monkeypatch.setattr('BambooPickup._update_builds_status', lambda x: mock_update_builds_status_positive)

pool = ThreadPool(processes=1)
async_result = pool.apply_async(bamboopickups.update_status())

monkeypatch.setattr('BambooPickup._update_overall_data', lambda x: mock_update_overall_data_positive)
monkeypatch.setattr('BambooPickup._is_a_build_running', …
Run Code Online (Sandbox Code Playgroud)

python unit-testing monkeypatching mocking pytest

4
推荐指数
1
解决办法
6615
查看次数

python点击standalone_mode的用法

这个问题是关于 Python Click库的。

我想单击以收集我的命令行参数。收集后,我想重用这些值。我不想要任何疯狂的回调链,只需使用返回值。默认情况下,单击禁用使用返回值并调用sys.exit()

我想知道如何正确调用standalone_mode( http://click.pocoo.org/5/exceptions/#what-if-i-don-t-want-that ),以防我想使用装饰器样式。上面链接的文档仅显示了使用单击(手动)创建命令时的用法。甚至有可能吗?下面显示了一个最小的例子。它说明了点击sys.exit()返回后如何直接调用gatherarguments

import click

@click.command()
@click.option('--name', help='Enter Name')
@click.pass_context
def gatherarguments(ctx, name):
    return ctx

def usectx(ctx):
    print("Name is %s" % ctx.params.name)

if __name__ == '__main__':
    ctx = gatherarguments()
    print(ctx) # is never called
    usectx(ctx) # is never called 
Run Code Online (Sandbox Code Playgroud)

$ python test.py --name Your_Name

我希望这是无状态的,意味着没有任何click.group功能 - 我只想要结果,而我的应用程序不退出。

python python-click

3
推荐指数
1
解决办法
2019
查看次数