如何对无限通量进行排序?

Dmy*_*tro 2 java reactive-programming project-reactor spring-webflux

我有一个Flux由专用Processor含义生成的无限实例,每个元素都是通过发出的sink.nextReceiver如果重要的话,元素来自反应式 Kafka )。问题是,每次我尝试做一些有用的事情时sortFlux它只会给出一个空的结果。这也适用于reduce

难道我做错了什么?

编辑

这是一个更具体的例子,它给出了一个空的Flux

Flux.<Integer>create(sink -> {
  sink.next(1);
  sink.next(2);
  sink.next(3);
  sink.next(4);
})
    .sort() // If I remove this everthing works as expected
    .log()
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

谈到我的具体情况,这是我所拥有的简化版本:

    FluxProcessor<Message, Message> processor = ReplayProcessor.<Message>createTimeout(Duration.ofDays(1)).serialize();
    FluxSink<Message> sink = processor.sink();
    Flux<Message> pipeline = processor;

    kafka.receive()
        .log()
        .map(ReceiverRecord::value)
        .subscribe(sink::next);

    return pipeline; // Work with the pipeline later on
Run Code Online (Sandbox Code Playgroud)

然后,如果我尝试无论是.sort.reducepipeline它总是导致空Flux

Ole*_*uka 8

从来没有flux.sort()你的无限流

据我所知,当流中的顺序元素没有任何数学运算时,不可能对无限持续的数据流进行排序。

中的排序实现Flux将所有数据收集到普通集合中在此处输入图片说明 所以简而言之,你Flux<T>正在转变为Mono<List<T>> 下面

在此处输入图片说明

你可以从上面的图片看,有没有神奇的背后,这意味着该元素将只有当发射Flux<T>已经完成

在此处输入图片说明

所以在这种情况下

我有一个由专用的无限 Flux 实例

它根本无法观察到任何结果。

那我该怎么办?

基本上,如果您需要对数据进行排序,例如,在流中具有顺序的日志,并且您需要保证基于时间戳等的多个源的全局排序,那么有一个算法列表可以解决这个问题然后,您可以使用 Reactor API 快速实现(例如,Flux.zip-> flatMapIterble,其中您的可迭代对象是来自所有源的元素压缩在一起,这意味着您只需要对其中的几个进行排序,或者您可以实现另一个更复杂的逻辑与Flux.combine)

除此之外,如果可以观察集合的部分快照,并将其余部分保留在内存中,那么您可以Flux.scan在以下示例中使用like:


flux.scan(new ArrayList(), (collection, newItemToAdd) -> {

   // use bubble sort since your collection is already sorted,
   // so you will reduce complexity to O(n) instead of having `O(n^2)` or `O(nlogn)`

   return collection;
})

Run Code Online (Sandbox Code Playgroud)