You*_*hah 1 spring spring-boot spring-cloud-stream
我有一个用例,我需要生成在运行时确定的多个 Kafka 主题/目的地。我尝试通过使用从类型的函数 bean返回并为每个参数设置标头(如此处所述)来将函数与多个输入和输出参数组合起来。我想出了以下实现:Flux<Message<T>>Functionspring.cloud.stream.sendto.destinationMessage
@Bean
public Function<Person, Flux<Message<Person>>> route() {
return person -> Flux.fromIterable(Stream.of(person.getEvents())
.map(e -> MessageBuilder.withPayload(person)
.setHeader("spring.cloud.stream.sendto.destination", e).build())
.collect(Collectors.toList()));
}
Run Code Online (Sandbox Code Playgroud)
我的配置中也有这个:
spring.cloud.stream.dynamic-destinations=
Run Code Online (Sandbox Code Playgroud)
这是我的Person:
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
private String[] events;
private String name;
}
Run Code Online (Sandbox Code Playgroud)
events包含 Kafka 主题名称列表。
然而,这不起作用。我缺少什么?
spring.cloud.stream.sendto.destination内部使用BinderAwareChannelResolver,但已被弃用,取而代之的是StreamBridge. 我认为你可以重写你的代码如下。我还没有测试过,但这是模板:
@Autowired StreamBridge streamBridge;
@Bean
public Consumer<Person> route() {
return person -> streamBridge.send(person.getName(), person);
}
Run Code Online (Sandbox Code Playgroud)
在幕后,Spring Cloud Stream 将Person动态创建绑定。
如果您在部署时提前知道目的地,也可以通过配置进行设置。对于例如spring.cloud.stream.sourceas foo;bar..;...。然后框架以 等形式创建输出绑定foo-out-0。bar-out-0然后您需要设置目的地 - spring.cloud.stream.bindings.foo-out-0.destination=foo。但由于您的用例严格涉及动态目的地,因此您不能采用这种方法,而是尝试使用我上面建议的方法。
| 归档时间: |
|
| 查看次数: |
2211 次 |
| 最近记录: |