针对不同消费者的每条消息的 Spring Cloud Stream 主题

idi*_*ous 2 spring spring-cloud spring-cloud-stream

我正在寻找的拓扑是

在此输入图像描述

到目前为止,我还没有看到在 Cloud Stream 中定义每条消息的主题的方法。我知道消费者将绑定到特定主题,但是生产者在将消息发送到交换器之前如何设置每条消息的主题?

source.output().send(MessageBuilder.withPayload(myMessage).build());
Run Code Online (Sandbox Code Playgroud)

不提供任何方法来设置交换的主题以路由到正确的消费者。

或者也许我不理解正确的东西?

更新

我希望不会在消费者中收到消息,因为bindingRoutingKey2222正在发送routeTo 1111. 但我仍然在消费者身上收到它。

生产者属性:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']


@EnableBinding(Source.class)
@SpringBootApplication
public class Application {

   public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

}
Run Code Online (Sandbox Code Playgroud)

发件人:

source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());

以及消费者:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222
Run Code Online (Sandbox Code Playgroud)

应用:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
}

@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) {
    log.info("Message received moDTO: {}", moDTO);
}

}
Run Code Online (Sandbox Code Playgroud)

第二次更新

根据下面接受的答案中的建议。我能够让它发挥作用。需要使用其 UI 从 RabbitMQ 中删除交换和队列并重新启动 RabbitMQ docker 映像。

Gar*_*ell 5

rabbitmq的routingKeyExpression 生产者属性

例如...producer.routing-key-expression=headers['routeTo']

然后

source.output().send(MessageBuilder.withPayload(myMessage)
    .setHeader("routeTo", "Booking.new")
    .build());
Run Code Online (Sandbox Code Playgroud)

请注意,目的地是交换名称。默认情况下,绑定器期望进行Topic交换。如果您希望使用直接交换,则必须设置该exchangeType属性。