在akka http上创建一个包含60+ API的REST Web服务时.如何选择是否应该选择akka流或akka演员?在他的帖子中,乔斯展示了两种在akka http上创建API的方法,但他没有显示何时我应该选择一个而不是另一个.
我试图了解在akka流媒体中做什么.例如:
val sink1:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)
val flow=Flow[Int].fold[Int](0){(x,y)=> x+y}
val runnable = Source (1 to 10).viaMat(flow)(Keep.right).toMat(sink1)(Keep.both)
Run Code Online (Sandbox Code Playgroud)
谢谢阿伦
任何人都可以解释我地图和mapAsync与AKKA流之间的区别吗?在文档中,据说
可以使用mapAsync或mapAsyncUnordered执行涉及外部非基于流的服务的流转换和副作用
为什么我们不能简单地在这里映射?我假设Flow,Source,Sink都是Monadic,因此map应该在这些性质的延迟中正常工作?
有人可以清楚地解释这4种方法之间的区别吗?何时使用每一个更合适?一般来说这组方法的名称是什么?有更多方法可以完成同样的工作吗?链接到scaladoc也可以提供帮助.
-D-
为什么是例外
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object TestExceptionHandling {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()(defaultActorSystem)
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}
}
}
Run Code Online (Sandbox Code Playgroud)
默默地忽略了?我可以看到流在打印后停止Received 1,但没有记录任何内容.请注意,问题通常不是日志记录配置,因为如果我akka.log-config-on-start = on在application.conf文件中设置,我会看到很多输出.
我想使用SourceQueue将元素动态推送到Akka Stream源.Play控制器需要Source才能使用该chuncked方法传输结果.
由于Play使用自己的Akka Stream Sink,我无法使用Sink实现源队列,因为在方法使用之前源将被消耗chunked(除非我使用下面的hack).
如果我使用反应流发布器预先实现源队列,我能够使它工作,但它是一种"肮脏的黑客":
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Run Code Online (Sandbox Code Playgroud)
是否有更简单的方法在PlayFramework中使用Akka Streams SourceQueue?
谢谢
我想使用Akka Streams读取多个大文件来处理每一行.想象一下,每个键都包含一个(identifier -> value).如果找到新的标识符,我想将它及其值保存在数据库中; 否则,如果在处理行流时已找到标识符,我只想保存该值.对于这一点,我认为我需要某种形式的递归状态流量,以保持这种已经在发现标识符Map.我想我会在这个流程中收到一对(newLine, contextWithIdentifiers).
我刚刚开始研究Akka Streams.我想我可以管理自己做无状态处理的东西,但我不知道如何保持contextWithIdentifiers.我很欣赏指向正确方向的任何指示.
目的是从数据库流式传输数据,对此数据块执行一些计算(此计算返回某些案例类的Future),并将此数据作为分块响应发送给用户.目前,我能够流式传输数据并发送响应,而无需执行任何计算.但是,我无法执行此计算,然后流式传输结果.
这是我实施的路线.
def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}
Run Code Online (Sandbox Code Playgroud)
函数getRds返回映射到case类的表的行(使用slick).现在考虑函数compute,它将每行作为输入并返回另一个案例类的Future.就像是
def compute(x: Tweet) : Future[TweetNew] = ?
Run Code Online (Sandbox Code Playgroud)
如何在变量src上实现此函数,并将此计算的分块响应(作为流)发送给用户.
我正在使用akka streams graphDSL来创建一个可运行的图.流组件的入口/出口没有编译时错误.运行时抛出以下错误:
任何想法我应该验证什么才能让它运行?
requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out]
at scala.Predef$.require(Predef.scala:219)
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168)
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390)
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813)
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109)
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65)
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39)
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13)
Run Code Online (Sandbox Code Playgroud)
图结构:
source ~> flowRate ~> render ~> platformPartition.in
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0)
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1)
merger.out ~> evtCreator ~> Sink.ignore
Run Code Online (Sandbox Code Playgroud) 在Akka流中,Mat [Out,Mat]或Sink [In,Mat]中的Mat代表什么.它什么时候才会被使用?