在java Flux中按对象属性分组

Sas*_*asa 10 java reactive-programming project-reactor

给定以下数据结构Data以及Flux<Data>根据某些属性实现分组为一系列列表的惯用方法是什么:

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;

class Scratch {
    private static class Data {
        private Integer key;
        private String value;

        public Data(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() {
            return key;
        }

        public String getValue() {
            return value;
        }

        public static Data of(Integer key, String value) {
            return new Data(key, value);
        }

        @Override
        public String toString() {
            return value;
        }
    }

    public static void main(String[] args) {
        Flux<Data> test = Flux.just(
                Data.of(1, "Hello"),
                Data.of(1, "world"),
                Data.of(2, "How"),
                Data.of(2, "are"),
                Data.of(2, "you"),
                Data.of(3, "Bye"));
        test.bufferUntil(new Predicate<Data>() {
            Integer prev = null;
            @Override
            public boolean test(Data next) {
                boolean collect = prev != null && !Objects.equals(prev, next.getKey());
                prev = next.getKey();
                return collect;
            }
        }, true).subscribe(e -> System.out.println(e.toString()));
    }
} 
Run Code Online (Sandbox Code Playgroud)

输出:

[Hello, world]
[How, are, you]
[Bye]
Run Code Online (Sandbox Code Playgroud)

我知道 Flux 上的 groupBy 函数,但这又给了我一个 Flux,而不是一个列表。我上面描述的当前解决方案有效,但感觉不是 100% 惯用的,因为我不得不使用匿名类而不是 lambda。我可以在 lambda 之外使用 lambda 和 AtomicReference,但这也不是 100% 正确。有什么建议?

Oma*_*IDI 6

您还可以使用collectMultimap它让您拥有Map<K, Collection<T>. 在这种情况下collectMultimap将返回Mono<Map<Integer,Collection<Data>>>::

 test.collectMultimap( Data::getKey )
     .subscribe( dataByKey -> System.out.println( dataByKey.toString() ) );
Run Code Online (Sandbox Code Playgroud)

输出:

{1=[Hello, world], 2=[How, are, you], 3=[Bye]}
Run Code Online (Sandbox Code Playgroud)


une*_*q95 5

这是使用 groupBy 运算符的解决方案。我已按公用键对数据进行分组。groupBy 运算符给我一个 GroupedFlux 的 Flux。List<Data>GroupedFlux 是 Fl​​ux 的子类,因此我应用 flatMap 并使用collectList 运算符将单个 groupedFlux 转换为 a 。像这样,我得到一个Flux<List<Data>>,然后按照您的要求订阅并打印。

test.groupBy(Data::getKey)
                .flatMap(Flux::collectList)
                .subscribe(listOfStringsHavingDataWithSameKey -> System.out.println(listOfStringsHavingDataWithSameKey.toString()));
Run Code Online (Sandbox Code Playgroud)

请查看FluxGroupedFlux的文档。