如何创建一个可以通过方法调用稍后接收元素的Source?

ale*_*bit 50 scala akka akka-stream akka-http

我想Source在它上面创建一个和后来的推送元素,如:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
Run Code Online (Sandbox Code Playgroud)

建议的方法是什么?

谢谢!

Ram*_*gil 88

有三种方法可以实现:

1.使用SourceQueue发布实现

您可以使用Source.queue它将Flow实现为SourceQueue:

case class Weather(zipCode : String, temperature : Double, raining : Boolean)

val bufferSize = 100

//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .to(Sink foreach println)
                  .run() // in order to "keep" the queue Materialized value instead of the Sink's

queue offer Weather("02139", 32.0, true)
Run Code Online (Sandbox Code Playgroud)

2.使用Actor发布实现

还有一个类似的问题和答案在这里,要点是,你兑现了流作为ActorRef和邮件发送到REF:

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run() // in order to "keep" the ref Materialized value instead of the Sink's

ref ! Weather("02139", 32.0, true)
Run Code Online (Sandbox Code Playgroud)

3.使用Actor实现预实现

同样,你可以明确地创建一个包含消息缓冲区的演员,使用该演员创造一个源,然后在答案中描述发送演员的消息在这里:

object WeatherForwarder {
  def props : Props = Props[WeatherForwarder]
}

//see provided link for example definition
class WeatherForwarder extends Actor {...}

val actorRef = actorSystem actorOf WeatherForwarder.props 

//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true) 

//stream already has 1 Weather value to process which is sitting in the 
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
Run Code Online (Sandbox Code Playgroud)

  • 队列预生成:http://stackoverflow.com/a/36467193/591922 (8认同)
  • @Loic 我没有从你的评论中得到“队列预实现”将是第四种可能的解决方案。这是。我发现这很好:http://stackoverflow.com/questions/37113877/how-can-i-use-and-return-source-queue-to-caller-without-materializing-it/37117205#37117205 (2认同)

Pet*_*osP 14

由于 Akka 2.5Source有一个preMaterialize方法。

根据文档,这看起来像是按照您的要求执行的指示方式:

在某些情况下,您需要一个Source物化值,然后才能Source将其连接到图形的其余部分。这在“物化价值驱动”源的情况下特别有用,例如Source.queueSource.actorRefSource.maybe

下面是一个关于如何使用SourceQueue. 元素在具体化之前和之后被推送到队列中,以及从Flow:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()


val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()

// Adding element before actual materialization
sourceMat.offer("pre materialization element")

val flow = Flow[String].map { e =>
  if(!e.contains("new")) {
    // Adding elements from within the flow
    sourceMat.offer("new element generated inside the flow")
  }
  s"Processing $e"
}

// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()

// Adding element after materialization
sourceMat.offer("post materialization element")
Run Code Online (Sandbox Code Playgroud)

输出:

Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow
Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

12771 次

最近记录:

6 年,1 月 前