从EventStream创建源

zun*_*der 5 scala playframework server-sent-events reactive-streams akka-stream

我正在使用PlayFramework 2.5.3,并希望创建akka.stream.scaladsl.Source一个akka.event.EventStream(事件流是演员系统的一部分).事件流将产生来自某种类型的事件,因此我需要订阅该特定类型的事件并通过使用来推送它们play.api.mvc.Results.chunked.有没有简单的方法来创建这样的Source使用Akka Streams 2.4.5?

Vla*_*eev 5

您可以Source.actorRef与订阅一起使用.Source.actorRef是一个实现了一个的源ActorRef,所以你可以这样做:

// choose the buffer size of the actor source and how the actor
// will react to its overflow
val eventListenerSource = Source.actorRef[YourEventType](32, OverflowStrategy.dropHead)

// run the stream and obtain all materialized values
val (eventListener, ...) = eventListenerSource
    .viaMat(...)(Keep.left)
    <...>
    .run()

// subscribe the source actor to the stream
actorSystem.eventStream.subscribe(eventListener, classOf[YourEventType])

// now events emitted by the source will go to the actor
// and through it to the stream
Run Code Online (Sandbox Code Playgroud)

请注意,actorRef源有些限制,例如,它自然不支持其内部缓冲区的背压溢出策略.您可能希望使用Source.actorPublisher扩展ActorPublisher[YourEventType]特征的actor ,它会给您更多控制.然而,由于EventStream是一个纯粹的基于推送的源代码,你将不能够做更多的ActorPublisherSource.actorRef,所以你可能只是以及使用更简单的方法.