中断concatMap中的单个可观察对象

tir*_*r38 6 concatmap rx-java2

我使用concatMap长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但只针对当前项:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer));
}
Run Code Online (Sandbox Code Playgroud)

我试图通过保留一次性的“内部”流并在适当的时间对其进行处理来解决此问题。但这没有用。内部流已配置,但是concatMap从不移至处理项目3。测试只是挂起(因为外部可观察到的源代码也永远不会完成/终止/配置)。

Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    disposable.dispose();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer))
            .doOnSubscribe(disposable -> {
                // save disposable so we can "interrupt" later
                System.out.println("Saving disposable for " + integer);
                Example.this.disposable = disposable;
            });
}
Run Code Online (Sandbox Code Playgroud)

即使这确实奏效,但依靠副作用似乎还是有点让人头疼。做到这一点的最佳方法是什么?

tir*_*r38 1

我有几乎相同的问题How to cancel individual network request in Retrofit with RxJava? 。我可以使用 aPublishSubject来“中断”

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    interrupter.onComplete();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    interrupter = PublishSubject.create();

    return Observable
            .just("Start working on " + integer,
                    "Still working on " + integer,
                    "Almost done working on " + integer)
            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("Finally for " + integer))
            .takeUntil(interrupter);
}
Run Code Online (Sandbox Code Playgroud)