Spring Cloud Stream - 在运行时路由到多个动态目的地

You*_*hah 1 spring spring-boot spring-cloud-stream

我有一个用例,我需要生成在运行时确定的多个 Kafka 主题/目的地。我尝试通过使用从类型的函数 bean返回并为每个参数设置标头(如此处所述)来将函数与多个输入和输出参数组合起来。我想出了以下实现:Flux<Message<T>>Functionspring.cloud.stream.sendto.destinationMessage

@Bean
public Function<Person, Flux<Message<Person>>> route() {
    return person -> Flux.fromIterable(Stream.of(person.getEvents())
            .map(e -> MessageBuilder.withPayload(person)
                    .setHeader("spring.cloud.stream.sendto.destination", e).build())
            .collect(Collectors.toList()));
}
Run Code Online (Sandbox Code Playgroud)

我的配置中也有这个:

spring.cloud.stream.dynamic-destinations=
Run Code Online (Sandbox Code Playgroud)

这是我的Person

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
    private String[] events;
    private String name;
}
Run Code Online (Sandbox Code Playgroud)

events包含 Kafka 主题名称列表。

然而,这不起作用。我缺少什么?

sob*_*cko 5

spring.cloud.stream.sendto.destination内部使用BinderAwareChannelResolver,但已被弃用,取而代之的是StreamBridge. 我认为你可以重写你的代码如下。我还没有测试过,但这是模板:

@Autowired StreamBridge streamBridge;

@Bean
public Consumer<Person> route() {
    return person -> streamBridge.send(person.getName(), person);
}

Run Code Online (Sandbox Code Playgroud)

在幕后,Spring Cloud Stream 将Person动态创建绑定。

如果您在部署时提前知道目的地,也可以通过配置进行设置。对于例如spring.cloud.stream.sourceas foo;bar..;...。然后框架以 等形式创建输出绑定foo-out-0bar-out-0然后您需要设置目的地 - spring.cloud.stream.bindings.foo-out-0.destination=foo。但由于您的用例严格涉及动态目的地,因此您不能采用这种方法,而是尝试使用我上面建议的方法。