akka-streams与akka-cluster

Wil*_* Am 4 scala akka akka-cluster akka-stream

我的akka​​-streams还在继续学习.我想将我的akka​​-streams应用程序与akka-cluster和DistributedPubSubMediator集成.

添加对Publish的支持是相当直接的,但订阅部分我遇到了麻烦.

作为参考,订户在Typesafe样本中给出如下:

class ChatClient(name: String) extends Actor {
  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe("some topic", self)

  def receive = {
    case ChatClient.Message(from, text) =>
      ...process message...
  }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,我应该如何将这个演员与我的流程集成,如何在没有流背压的情况下确保我获得发布消息?

我正在尝试完成一个pubsub模型,其中一个流可以发布消息而另一个流将使用它(如果订阅).

小智 8

你可能想让你的Actor扩展ActorPublisher.然后,您可以从中创建一个Source并将其集成到您的流中.

请参阅此处有关ActorPublisher的文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html