小编Igo*_*nyi的帖子

Java 8集合流 - 将列表转换为Set,转换结果

成像对象采用以下方法:

class A { List<B> getIds(){...} }
Run Code Online (Sandbox Code Playgroud)

现在我有一个A的集合作为输入; 而且我希望得到一组独特的ID,通常你会选择:

Set<B> ids = new HashSet<>();
for(A a : input){
  ids.addAll(a.getIds());
}
Run Code Online (Sandbox Code Playgroud)

有没有办法使用流API在一行中执行相同操作,如下所示

Set<List<B>> set = input.stream().map((a) -> a.getIds()).collect(Collectors.toSet());
Run Code Online (Sandbox Code Playgroud)

但制作扁平的B组

java collections java-stream

3
推荐指数
1
解决办法
1069
查看次数

如何有选择地跳过 Flux 上的多个处理步骤

我有从套接字接收的动态热数据流。我需要检查条件,如果值匹配,则跳转到步骤 3 并显示新消息。

    final Flux<Msg> msgs = Flux.generate(receiver);

    final Flux<Msg> processed = msgs
        .map(this::checkCondition)  //step1
        .map(remote::doLongRunning) //optional step2
        .map(this::processFurther)  //step3
 ...

    public Msg checkCondition(Msg msg) {
        if(doCheck(msg)){
            //is there a way to jump to step3 here ?
            return new OtherMsg(msg, "someAdditionalData")) 
        } else{
            return msg
        }
    }

Run Code Online (Sandbox Code Playgroud)

我能想到的唯一选择 - 是将 Flux 分开并将其组装回来,有没有更干净的方法?

   final Flux<Msg> msgs = Flux.generate(receiver);

        final Flux<OtherMsg> checked = msgs
            .filter(this::doCheck) //step1
            .map(msg -> new OtherMsg(msg, "someAdditionalData"));

        final Flux<OtherMsg> unchecked = msgs
            .filter(msg -> !doCheck(msg)) //step1
            .map(remote::doLongRunning);  //optional …
Run Code Online (Sandbox Code Playgroud)

java flux project-reactor option-type

3
推荐指数
1
解决办法
2457
查看次数

结合 spring-kafka 和reactor-kafka 时出现意外的循环依赖

以下代码在receiverOptions和模板之间产生意外的循环依赖:

令人惊讶的是,如果从 spring 上下文中删除 kafkaProps,它会起作用。

看起来某些自动配置正在添加从模板到接收器选项的不必要的依赖项。

请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Map<String, Object> kafkaProps() {
        return Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
            ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
        );
    }

    @Bean
    public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
        return ReceiverOptions.<String, String>create(kafkaProps)
            .withKeyDeserializer(new StringDeserializer())
            .withValueDeserializer(new StringDeserializer())
            .subscription(List.of("test-topic"));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @Bean
    public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
        return args -> kafkaReceiver.receive()
            .log()
            .subscribe();
    } …
Run Code Online (Sandbox Code Playgroud)

circular-reference spring-boot project-reactor spring-kafka

3
推荐指数
1
解决办法
1227
查看次数