Séb*_*mer 6 java rx-java spring-boot spring-cloud
TLDR:我在RxJava Observables中进行后台处理,我正在进行集成测试,我希望能够独立等待该处理完成,以确保从一次测试开始的后台处理不会干扰另一个测试.
简化,我有一个@RequestMapping方法,可以执行以下操作:
HttpStatus.NO_CONTENT)这种异步处理以前是用a完成的ThreadPoolTaskExecutor.我们将转换到RxJava并希望删除此ThreadPoolTaskExecutor并使用RxJava进行后台处理.
所以我试图这样做的时候非常天真:
Observable
.defer(() -> Observable.just(call to long blocking method)
.subscribeOn(Schedulers.io())
.subscribe();
Run Code Online (Sandbox Code Playgroud)
最终目标当然是,一步一步,进入"调用长阻塞方法"并一直使用Observable.
在此之前,我想首先使我的集成测试工作.我通过对映射执行RestTemplate调试来测试它.由于大多数工作是异步的,我的调用返回非常快.现在我想找到一种方法来等待异步处理完成(以确保它不与另一个测试冲突).
在RxJava之前,我只计算ThreadPoolTaskExecutor中的任务并等到它达到0.
我怎么能用RxJava做到这一点?
我尝试了什么:
我可能非常错误,或者过于复杂的事情,所以不要犹豫,纠正我的理由!
你怎么回来HttpStatus.NO_CONTENT?
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe();
return HttpStatus.NO_CONTENT;
}
Run Code Online (Sandbox Code Playgroud)
在这种形式中,您无法知道何时longMethod完成。
如果您想知道所有异步作业何时完成,可以HttpStatus.NO_CONTENT在所有作业完成时返回,使用 SpringDefferedResult或使用TestSubscriber
PS:如果你愿意,你可以使用Observable.fromCallable(() -> longMethod());代替Observable.defer(() -> Observable.just(longMethod());
@RequestMapping(value = "/")
public DeferredResult<HttpStatus> index() {
DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
Observable.fromCallable(() -> longMethod())
.subscribeOn(Schedulers.io())
.subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
return deferredResult;
}
Run Code Online (Sandbox Code Playgroud)
像这样,如果你调用你的方法,你只会在你的 observable 完成时得到你的结果(所以,当longMethod完成时)
您必须注入 aTestSubscriber并在要求他等待/检查 Observable 的完成情况时:
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe(subscriber); // you'll have to inject this subscriber in your test
return HttpStatus.NO_CONTENT;
}
Run Code Online (Sandbox Code Playgroud)
并在您的测试中:
TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
// ....
controller.home();
subscriber.awaitTerminalEvent();
subscriber.assertCompleted(); // check that no error occurred
Run Code Online (Sandbox Code Playgroud)
几个月后,在游戏中:我的建议就是“不要这样做”。RxJava不太适合这种工作。无需过多详细说明,在后台运行大量“松散”的 Observable 是不合适的:根据请求的数量,您很容易陷入队列和内存问题,更重要的是所有计划和正在运行的任务会发生什么如果网络服务器崩溃了?你如何重新启动它?
Spring 提供了其他更好的替代方案:Spring Batch、Spring Cloud Task、使用Spring Cloud Stream进行消息传递,所以不要像我那样做,只需使用正确的工具来完成正确的工作。
现在如果你真的想走坏路:
lift运算符,将call订阅者(在方法中)包装在具有waitForCompletion方法的父订阅者中。如何等待取决于您(例如CountDownLatch)。该订阅者将被添加到同步列表中(并在完成后从其中删除),并且在您的测试中,您可以迭代该列表并调用waitForCompletion列表中的每个项目。这并不复杂,我已经让它工作了,但是请不要这样做!| 归档时间: |
|
| 查看次数: |
2451 次 |
| 最近记录: |