在 Project reactor 中处理 ListenableFuture

Nis*_*ant 1 project-reactor

我已经开始玩项目反应器,并希望将我们的一个 API 移动到反应式的做事方式。我想知道什么是立即处理像 ListenableFuture 这样的事情。

就我而言,我使用的是 Cassandra,当我调用session.executeAsync()它时,它会返回一个扩展了 ListenableFuture 的 ResultSetFuture。下面是我现在编写的代码示例,我似乎对向客户公开 ListenableFuture 不满意。

public Mono<ListenableFuture<Void>> save(Publisher<AccountDTO> accountPublisher) {
    return Mono.just(accountPublisher)
            .map(accountDTO -> {
                Account accountEntity = modelMapper.map(accountDTO, Account.class);
                return mappingManager.mapper(Account.class).saveAsync(accountEntity);
            })
            .retry(1)
            .doOnError(throwable -> log.error("Unable to create account "))
            .mapError(throwable -> new MyCustomException(""));
}
Run Code Online (Sandbox Code Playgroud)

我的问题是:

公开 ListenableFuture 是否是一个好习惯,我个人不想将这样的任何东西回馈给他们可以阻止的客户端。在项目反应器中是否有更好的方法来处理这个问题,我可以只返回 Mono?

Sim*_*slé 6

您可以使用工厂方法轻松桥接ListenableFuture<Void>异步 API 以代替公开Mono<Void>, Mono.create()。该方法采用 a Consumer<Sink>,您将其作为 lambda 提供:

  1. 向调用的未来添加一个成功侦听器sink.success()(因为没有实际值,或者您也可以success(aVoid)使用Void监听器接收到的值)
  2. 向调用的未来添加一个失败侦听器 sink.error(failure)

差不多就是这样!请参阅参考文档create(尽管这个Flux版本提到了由于必须处理多个值而稍微复杂一些的版本):http : //projectreactor.io/docs/core/release/reference/docs/index.html#生产.创建


Nis*_*ant 5

发布我按照上面@Simon 的指导编码的代码片段。

@Override
public Mono<Void> save(AccountDTO accountDTO) {
    return Mono.create(voidMonoSink -> {

        Account account = converter.map(accountDTO, Account.class);

        ListenableFuture<Void> voidListenableFuture = mappingManager.mapper(Account.class).saveAsync(account);

        Futures.addCallback(voidListenableFuture, new FutureCallback<Void>() {

            @Override
            public void onSuccess(Void result) {
                voidMonoSink.success(result);
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("Unable to save account " + accountDTO, t);
                voidMonoSink.error(new MyCustomException());
            }
        });
    });
}
Run Code Online (Sandbox Code Playgroud)