RxJava同时删除操作

Ped*_*dro 5 android rx-java rx-java2

我试图以一种被动的方式理解同一个observable的同时操作应该如何工作.

方案如下:

我有一个用户列表和一个删除按钮.每当我按下remove我正在调用API时:UsersApi.removeUser.可以同时删除多个用户.这意味着多个UsersApi.removeUser同时发生.

每次之后UsersApi.removeUser我都需要打个UsersApi.refreshUser电话

所以就伪代码而言,点击删除时我正在做的事情如下:

主持人:

public Observable<User> removeUser(int userId) {
        return UsersApi.removeUser(userId)
                .flatMap(user -> UsersApi.refreshUser(userId));
    }
Run Code Online (Sandbox Code Playgroud)

分段:

public void removeUser() {
   presenter.removeUser(userId)
     .subscribe(user -> {
        //remove user from ui
        // update number of total users
   })
}
Run Code Online (Sandbox Code Playgroud)

这种方法的问题在于,由于remove的异步性质(允许多次删除),我无法保证到达订阅的内容是最新的.订阅将达到两次,每次删除一次,用户信息可能不会更新或最新.那有意义吗?

我想要发生什么:

  1. 并行/同时使用被动方法删除呼叫(由用户多次删除点击触发)
  2. 删除呼叫完成后,启动下一个删除呼叫

编辑:我想知道的是如何做/如果可以使用Rx运算符做我做的解决方案(参见edit2).

Edit2:我的解决方案是将用户操作(在这种情况下remove)排入队列,并在UsersApi.refreshUser(userId)调用完成时使用PublishSubject发出.

基本上我所做的是(伪代码):

private final PublishSubject<UserOperation> userOperationObs;
private final ConcurrentLinkedQueue<UserOperation> pendingOperations;
private boolean executingOperation;

private void emitUserOperation(final UserOperation operation) {
      if (!executingOperation) {
            executingOperation = true;
            userOperationObs.onNext(operation);
        } else {
            executingOperation.add(operation);
        }
    }

public Observable<User> removeUser(UserOperation operation) {
        return UsersApi.removeUser(operation.getUserId)
                .switchMap(user -> UsersApi.refreshUser(operation.getUserId))
                .doOnNext(user -> {
                   executingOperation = false;
                   final UserOperation nextOperation = pendingOperations.poll();
                   if (nextOperation != null) {
                       userOperationObs.onNext(operation);
                   }
};
    }
Run Code Online (Sandbox Code Playgroud)

And*_*din 0

您可以将 UI 点击转换为 Observable(例如,通过使用RxBinding)。之后,您可以使用concatMap运算符执行 api 调用,这样一旦当前 api 调用完成,它将开始下一个网络调用。

// emit clicks as stream
Observable<?> clicks = RxView.clicks(removeView)

// listen clicks then perform network call in sequence
clicks.concatMap(ignored -> usersApi.refreshUser(userId))
Run Code Online (Sandbox Code Playgroud)