成像对象采用以下方法:
class A { List<B> getIds(){...} }
Run Code Online (Sandbox Code Playgroud)
现在我有一个A的集合作为输入; 而且我希望得到一组独特的ID,通常你会选择:
Set<B> ids = new HashSet<>();
for(A a : input){
ids.addAll(a.getIds());
}
Run Code Online (Sandbox Code Playgroud)
有没有办法使用流API在一行中执行相同操作,如下所示
Set<List<B>> set = input.stream().map((a) -> a.getIds()).collect(Collectors.toSet());
Run Code Online (Sandbox Code Playgroud)
但制作扁平的B组
我有从套接字接收的动态热数据流。我需要检查条件,如果值匹配,则跳转到步骤 3 并显示新消息。
final Flux<Msg> msgs = Flux.generate(receiver);
final Flux<Msg> processed = msgs
.map(this::checkCondition) //step1
.map(remote::doLongRunning) //optional step2
.map(this::processFurther) //step3
...
public Msg checkCondition(Msg msg) {
if(doCheck(msg)){
//is there a way to jump to step3 here ?
return new OtherMsg(msg, "someAdditionalData"))
} else{
return msg
}
}
Run Code Online (Sandbox Code Playgroud)
我能想到的唯一选择 - 是将 Flux 分开并将其组装回来,有没有更干净的方法?
final Flux<Msg> msgs = Flux.generate(receiver);
final Flux<OtherMsg> checked = msgs
.filter(this::doCheck) //step1
.map(msg -> new OtherMsg(msg, "someAdditionalData"));
final Flux<OtherMsg> unchecked = msgs
.filter(msg -> !doCheck(msg)) //step1
.map(remote::doLongRunning); //optional …Run Code Online (Sandbox Code Playgroud) 以下代码在receiverOptions和模板之间产生意外的循环依赖:
令人惊讶的是,如果从 spring 上下文中删除 kafkaProps,它会起作用。
看起来某些自动配置正在添加从模板到接收器选项的不必要的依赖项。
请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Map<String, Object> kafkaProps() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
);
}
@Bean
public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
return ReceiverOptions.<String, String>create(kafkaProps)
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(List.of("test-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
return args -> kafkaReceiver.receive()
.log()
.subscribe();
} …Run Code Online (Sandbox Code Playgroud)