tir*_*r38 6 concatmap rx-java2
我使用concatMap
长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但只针对当前项:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer));
}
Run Code Online (Sandbox Code Playgroud)
我试图通过保留一次性的“内部”流并在适当的时间对其进行处理来解决此问题。但这没有用。内部流已配置,但是concatMap
从不移至处理项目3。测试只是挂起(因为外部可观察到的源代码也永远不会完成/终止/配置)。
Disposable disposable = Disposables.empty();
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
disposable.dispose();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer))
.doOnSubscribe(disposable -> {
// save disposable so we can "interrupt" later
System.out.println("Saving disposable for " + integer);
Example.this.disposable = disposable;
});
}
Run Code Online (Sandbox Code Playgroud)
即使这确实奏效,但依靠副作用似乎还是有点让人头疼。做到这一点的最佳方法是什么?
我有几乎相同的问题How to cancel individual network request in Retrofit with RxJava? 。我可以使用 aPublishSubject
来“中断”
private PublishSubject interrupter;
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
interrupter.onComplete();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
interrupter = PublishSubject.create();
return Observable
.just("Start working on " + integer,
"Still working on " + integer,
"Almost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("Finally for " + integer))
.takeUntil(interrupter);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
81 次 |
最近记录: |