我的应用程序有一个Akka-Websocket接口.Web套接字由actor-subscriber和actor发布者组成.订户通过将命令发送到相应的actor来处理命令.发布者侦听事件流并将更新信息发布回流(最后发送到客户端).这很好用.
我的问题:订阅者如何将事件发送回流?例如,确认执行收到的命令.
public class WebSocketApp extends HttpApp {
private static final Gson gson = new Gson();
@Override
public Route createRoute() {
return get(
path("metrics").route(handleWebSocketMessages(metrics()))
);
}
private Flow<Message, Message, ?> metrics() {
Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
Source<Message, ActorRef> metricsSource =
Source.actorPublisher(WebSocketDataPublisherActor.props())
.map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
return Flow.fromSinkAndSource(metricsSink, metricsSource);
}
}
Run Code Online (Sandbox Code Playgroud)
一个很好的解决方案可能是,订阅actor(WebSocketCommandSubscriber上面代码中的actor)可以将消息发送回流,如sender().tell(...)...