Spring Cloud Stream RabbitMQ

mai*_*nsi 5 spring spring-boot spring-cloud-stream

我试图理解为什么我想在 RabbitMQ 中使用 Spring 云流。我看过 RabbitMQ Spring 教程 4 ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ),这基本上就是我想要做的。它创建了一个带有 2 个附加队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。

如果您查看教程,则整个过程非常简单,您创建所有部件,将它们绑定在一起,然后就可以开始了。

我想知道使用 Sing Cloud Stream 有什么好处,如果这就是它的用例。创建一个简单的交换很容易,甚至可以通过流直接定义目的地和组。所以我想为什么不更进一步并尝试使用流处理教程案例。

我已经看到 Stream 有一个BinderAwareChannelResolver似乎做同样的事情。但是我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖问题,但我似乎在这里从根本上误解了一些东西,我认为是这样的:

spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
Run Code Online (Sandbox Code Playgroud)

应该诀窍。

有没有人有一个源和接收器的最小示例,它基本上创建了一个直接交换,将 2 个队列绑定到它,并根据路由关键路由到这两个队列之一,如https://www.rabbitmq.com/tutorials /tutorial-four-spring-amqp.html

编辑

下面是一组最小的代码,它演示了如何执行我所要求的操作。我没有附上,build.gradle因为它很简单(但如果有人感兴趣,请告诉我)

application.properties: 设置生产者

spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Run Code Online (Sandbox Code Playgroud)

Sources.class: 设置制作人频道

public interface Sources {

    String OUTPUT = "output";

    @Output(Sources.OUTPUT)
    MessageChannel output();
}
Run Code Online (Sandbox Code Playgroud)

StatusController.class:响应休息呼叫并使用特定的路由键发送消息

/**
 * Status endpoint for the health-check service.
 */
@RestController
@EnableBinding(Sources.class)
public class StatusController {

    private int index;

    private int count;

    private final String[] keys = {"orange", "black", "green"};

    private Sources sources;

    private StatusService status;

    @Autowired
    public StatusController(Sources sources, StatusService status) {
        this.sources = sources;
        this.status = status;
    }

    /**
     * Service available, service returns "OK"'.
     * @return The Status of the service.
     */
    @RequestMapping("/status")
    public String status() {
        String status = this.status.getStatus();

        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == 3) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String payload = builder.toString();
        log.info(payload);

        // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
        // and add the value as routing key
        Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
        sources.output().send(msg);

        // return rest call
        return status;
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者方面的东西,属性:

spring.cloud.stream.bindings.input.destination=tut.direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
spring.cloud.stream.bindings.inputer.destination=tut.direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black
Run Code Online (Sandbox Code Playgroud)

Sinks.class

public interface Sinks {

    String INPUT = "input";

    @Input(Sinks.INPUT)
    SubscribableChannel input();

    String INPUTER = "inputer";

    @Input(Sinks.INPUTER)
    SubscribableChannel inputer();
}
Run Code Online (Sandbox Code Playgroud)

ReceiveStatus.class: 接收状态:

@EnableBinding(Sinks.class)
public class ReceiveStatus {
    @StreamListener(Sinks.INPUT)
    public void receiveStatusOrange(String msg) {
       log.info("I received a message. It was orange number: {}", msg);
    }

    @StreamListener(Sinks.INPUTER)
    public void receiveStatusBlack(String msg) {
        log.info("I received a message. It was black number: {}", msg);
    }
}
Run Code Online (Sandbox Code Playgroud)

小智 4

@EnableBindingSpring Cloud Stream 允许您使用 Spring Cloud Stream Binder 实现(Kafka、RabbitMQ、JMS 绑定器等)连接(通过)到外部消息系统,从而开发事件驱动的微服务应用程序。显然,Spring Cloud Stream 使用 Spring AMQP 来实现 RabbitMQ 绑定器。

适用BinderAwareChannelResolver于对生产者的动态绑定支持,我认为在您的情况下,它是关于配置交换并将消费者绑定到该交换。

例如,您需要有 2 个bindingRoutingKey根据您的条件具有适当集合的消费者,以及一个具有上述属性(路由键表达式、目的地)的生产者(组除外)。我注意到您已经配置了group出站通道。该group属性仅适用于消费者(因此为入境)。

您可能还想检查这个: https: //github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57,因为我看到一些关于使用routing-key-expression. 具体来说,请检查使用表达式值的这一点。