RxJava.顺序执行

Tim*_*ima 14 android rx-java rx-java2

在我的Android应用程序中,我有一个处理用户交互的演示者,包含一种请求管理器,如果需要,还可以通过请求管理器向请求管理器发送用户输入.

请求管理器本身包含服务器API并使用此RxJava处理服务器请求.

我有一个代码,每次用户输入消息并向服务器显示响应时,它会向服务器发送请求:

private Observable<List<Answer>> sendRequest(String request) {
    MyRequest request = new MyRequest();
    request.setInput(request);
    return Observable.fromCallable(() -> serverApi.process(request))
            .doOnNext(myResponse -> {
                // store some data
            })
            .map(MyResponse::getAnswers)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)

但是现在我需要有一种排队.用户可以在服务器响应之前发送新消息.应按顺序处理队列中的每条消息.即,在我们得到对第一条消息的响应后,将发送第二条消息,依此类推.

如果发生错误,则不应处理进一步的请求.

我还需要在RecyclerView中显示答案.

我不知道如何更改上面的代码来实现上述处理

我看到了一些问题.一方面,该队列可以由用户随时更新,另一方面,任何时候服务器发送响应消息应该从队列中删除.

也许有一个rxjava运算符或我错过的特殊方式.

我在这里看到了类似的答案,但是,"队列"是不变的. 使用RxJava和Retrofit进行N次连续api调用

我会非常感谢任何解决方案或链接

Dea*_* Xu 6

我没有找到任何优雅的native-RxJava解决方案.所以我会定制一个Subscriber来做你的工作.

对于你的3分:

  1. 对于顺序执行,我们创建一个单线程调度程序

    Scheduler sequential = Schedulers.from(Executors.newFixedThreadPool(1));

  2. 为了在发生错误时停止所有请求,我们应该一起订阅所有请求,而不是Flowable每次都创建一个请求.所以我们定义了以下函数(这里我请求Integer和响应String):

    void sendRequest(Integer request)

    Flowable<String> reciveResponse()

    并定义一个字段以建立请求和响应流的关联:

    FlowableProcessor<Integer> requestQueue = UnicastProcessor.create();

  3. 为了重新运行未发送的请求,我们定义了重新运行功能:

    void rerun()

然后我们可以使用它:

reciveResponse().subscribe(/**your subscriber**/)
Run Code Online (Sandbox Code Playgroud)

现在让我们实施它们.

发送请求时,我们只需将其推入 requestQueue

public void sendRequest(Integer request) {
  requestQueue.onNext(request);
}
Run Code Online (Sandbox Code Playgroud)

首先,要按顺序执行请求,我们应该安排工作sequential:

requestQueue
  .observeOn(sequential)
  .map(i -> mockLongTimeRequest(i)) // mock for your serverApi.process
  .observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

第二,在发生错误时停止请求.这是默认行为.如果我们什么都不做,则错误将导致订阅中断,并且不会发出任何更多项目.

第三,重新运行未发送的请求.首先,因为本机运算符将取消流,如MapSubscriberdo(RxJava-2.1.0-FlowableMap#63):

try {
    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
    fail(ex);// fail will call cancel
    return;
}
Run Code Online (Sandbox Code Playgroud)

我们应该包装错误.这里我使用我的Try类来包装可能的异常,你可以使用任何其他可以包装异常的实现而不是抛出它:

    .map(i -> Try.to(() -> mockLongTimeRequest(i)))
Run Code Online (Sandbox Code Playgroud)

然后是习惯OnErrorStopSubscriber implements Subscriber<Try<T>>, Subscription.

它通常会请求和发出物品.当发生错误(实际上是发生故障Try)时,它停在那里并且不会请求或发出甚至下游请求它.调用rerun方法后,它将返回正在运行的statu并正常发出.这节课大约有80行.你可以在我的github上看到代码.

现在我们可以测试我们的代码:

public static void main(String[] args) throws InterruptedException {
  Q47264933 q = new Q47264933();
  IntStream.range(1, 10).forEach(i -> q.sendRequest(i));// emit 1 to 10
  q.reciveResponse().subscribe(e -> System.out.println("\tdo for: " + e));
  Thread.sleep(10000);
  q.rerun(); // re-run after 10s
  Thread.sleep(10000);// wait for it complete because the worker thread is deamon
}

private String mockLongTimeRequest(int i) {
  Thread.sleep((long) (1000 * Math.random()));
  if (i == 5) {
    throw new RuntimeException(); // error occur when request 5
  }
  return Integer.toString(i);
}
Run Code Online (Sandbox Code Playgroud)

并输出:

1 start at:129
1 done  at:948
2 start at:950
    do for: 1
2 done  at:1383
3 start at:1383
    do for: 2
3 done  at:1778
4 start at:1778
    do for: 3
4 done  at:2397
5 start at:2397
    do for: 4
error happen: java.lang.RuntimeException
6 start at:10129
6 done  at:10253
7 start at:10253
    do for: 6
7 done  at:10415
8 start at:10415
    do for: 7
8 done  at:10874
9 start at:10874
    do for: 8
9 done  at:11544
    do for: 9
Run Code Online (Sandbox Code Playgroud)

你可以看到它顺序运行.并在发生错误时停止.在调用rerun方法之后,它继续处理左未发送请求.

有关完整代码,请参阅我的github.


Tub*_*uby 2

对于这种行为,我使用 Flowable 背压实现。创建作为 api 请求流的父级的外部流,使用 maxConcurrency = 1 平面映射 api 请求并实现某种缓冲策略,这样您的 Flowable 就不会抛出异常。

Flowable.create(emitter -> {/* user input stream*/}, BackpressureStrategy.BUFFER)
                .onBackpressureBuffer(127, // buffer size
                        () -> {/* overflow action*/},
                        BackpressureOverflowStrategy.DROP_LATEST) // action when buffer exceeds 127
                .flatMap(request -> sendRequest(request), 1) // very important parameter
                .subscribe(results -> {
                    // work with results
                }, error -> {
                    // work with errors
                });
Run Code Online (Sandbox Code Playgroud)

它将缓冲用户输入到给定的阈值,然后丢弃它(如果不这样做,它将抛出异常,但用户不太可能超出这样的缓冲区),它将像队列一样按顺序1接1执行。如果库本身存在用于某种行为的运算符,请不要尝试自己实现此行为。

哦,我忘了提,你的sendRequest()方法必须返回 Flowable,或者你可以将其转换为 Flowable。

希望这可以帮助!