Tia*_*ira 13 java event-based-programming vert.x
在vert.x中,我可以向另一个Verticle发送消息并"异步等待"以进行回复.
问题是:我想向多个Verticle发送消息,并在所有Verticle回复时调用异步处理程序.
这是可能的还是有更好的设计来实现这个功能?
编辑:
我有一个垂直A,它发送消息到Verticle B,C和D.每个Verticle(B,C,D)对消息做一些事情并返回一些数据.然后,垂直A接收来自B,C,D的响应,并对所有数据执行某些操作.问题是我为每个发送的消息都有一个处理程序(一个用于A,一个用于B,一个用于C),我希望在所有回复到达时调用一个处理程序.
Yan*_*oto 12
从Vert.x 3.2开始,文档解释了如何使用Future和协调异步CompositeFuture.
因此,假设您想send通过事件总线进行两次调用,并在两者都成功时执行某些操作:
Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());
Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());
CompositeFuture.all(f1, f2).setHandler(result -> {
// business as usual
});
Run Code Online (Sandbox Code Playgroud)
最多可以将6个期货作为参数传递,或者它们可以作为列表传递.
最好的方法是使用 Reactive Extensions,它由Netflix 的 Rx.Java实现并由RxVertx Module提供。
种类繁多的运算符允许您执行诸如将多个异步调用的结果“压缩”到新结果中之类的操作,并对其执行任何您想要的操作。
我在 GitHub 上有一个简单的演示,其中包含:
final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);
Run Code Online (Sandbox Code Playgroud)
此代码片段展示了来自两个事件总线异步交互的两个可观察量如何“压缩”(即合并)到一个最终 HTTP 响应中。
| 归档时间: |
|
| 查看次数: |
5929 次 |
| 最近记录: |