小编sch*_*nas的帖子

使用 Project Reactor 管理对共享资源的访问

如何使用 Project Reactor 管理对共享资源的访问?

给定一个虚构的关键组件,该组件当时只能执行操作(文件存储、昂贵的远程服务等),如果存在对该组件的多个访问点(多个 API 方法、订户...)?如果资源可以自由执行该操作,则应该立即执行它,如果其他操作已经在进行中,则将我的操作添加到队列中,并在我的操作完成后完成我的 Mono。

我的想法是将任务添加到flux队列中,该队列一个接一个地执行任务,并返回一个Mono,一旦队列中的任务完成,该Mono就会完成,而不会阻塞。

class CriticalResource {

    private final Sinks.Many<Mono<?>> taskExecutor = Sinks.many()
                                                          .unicast()
                                                          .onBackpressureBuffer();

    private final Disposable taskExecutorDisposable =  taskExecutor.asFlux()
                                                                      .concatMap(Function.identity()) //this executes actions in sequential order
                                                                      .subscribe();


    public Mono<Void> resourceOperation1() {
        doSomething();
        .as(this::sequential);
    }


    public Mono<Void> resourceOperation2() {
        doSomethingElse();
        .as(this::sequential);
    }


    public Mono<Void> resourceOperation3() {
        doSomething();
        .then(somethingElse())
        .as(this::sequential);
    }

    private <T> Mono<T> sequential(Mono<T> action) {
        return Mono.defer(() -> {
            Sinks.One<T> actionResult = Sinks.one(); //create a new mono which should complete when our action …
Run Code Online (Sandbox Code Playgroud)

concurrency project-reactor reactive

7
推荐指数
1
解决办法
345
查看次数

已弃用的 WorkQueueProcessor 的替代方案

在最近的反应器版本中,WorkQueueProcessor已被弃用,并在 3.5 版本中被删除。对于新的 Sinks 规范,我想知道 WorkQueueProcessor 是否有替代方案?我希望以循环方式将信号仅分发给其中一个订阅者。

reactor reactive-programming flux project-reactor

6
推荐指数
0
解决办法
302
查看次数