如何在Web插槽连接中从订阅者向发布者发送反应流中的消息

fs1*_*123 6 akka akka-stream akka-http

我的应用程序有一个Akka-Websocket接口.Web套接字由actor-subscriber和actor发布者组成.订户通过将命令发送到相应的actor来处理命令.发布者侦听事件流并将更新信息发布回流(最后发送到客户端).这很好用.

我的问题:订阅者如何将事件发送回流?例如,确认执行收到的命令.

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}
Run Code Online (Sandbox Code Playgroud)

一个很好的解决方案可能是,订阅actor(WebSocketCommandSubscriber上面代码中的actor)可以将消息发送回流,如sender().tell(...)...

Vla*_*eev 5

不,至少不可能,不是直接的.流总是单向的 - 所有消息都在一个方向上流动,而对它们的需求则以相反的方向流动.您需要将确认消息从接收器传递给源,后者才能将其发送回客户端,例如,通过在接收器actor中注册源actor.它可能看起来像这样:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
    sinkActor.tell(new RegisterSource(sourceActor), null);
})
Run Code Online (Sandbox Code Playgroud)

然后,在您的接收器接收RegisterSource消息之后,它可以将确认消息发送到提供的ActorRef,然后将它们转发到输出流.