标签: akka-stream

akka http:Akka溪流与演员建立休息服务

在akka http上创建一个包含60+ API的REST Web服务时.如何选择是否应该选择akka流或akka演员?在他的帖子中,乔斯展示了两种在akka http上创建API的方法,但他没有显示何时我应该选择一个而不是另一个.

scala akka akka-stream akka-http

20
推荐指数
2
解决办法
5692
查看次数

akka stream toMat

我试图了解在akka流媒体中做什么.例如:

val sink1:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val flow=Flow[Int].fold[Int](0){(x,y)=> x+y}

val runnable = Source (1 to 10).viaMat(flow)(Keep.right).toMat(sink1)(Keep.both)
Run Code Online (Sandbox Code Playgroud)
  1. 什么是使用viaMat vs via?
  2. 什么是toMat在viaMat到toMat之间做什么?
  3. 什么是keep.both的用途,是否意味着我可以从之前和当前的一个实现价值,如果是,那么我如何能够获得这些价值.

谢谢阿伦

streaming scala akka akka-stream

20
推荐指数
1
解决办法
4527
查看次数

map和mapAsync之间的区别

任何人都可以解释我地图和mapAsync与AKKA流之间的区别吗?在文档中,据说

可以使用mapAsync或mapAsyncUnordered执行涉及外部非基于流的服务的流转换和副作用

为什么我们不能简单地在这里映射?我假设Flow,Source,Sink都是Monadic,因此map应该在这些性质的延迟中正常工作?

scala akka-stream

19
推荐指数
1
解决办法
9291
查看次数

通过/ ViaMat/to/toMat在Akka Stream

有人可以清楚地解释这4种方法之间的区别吗?何时使用每一个更合适?一般来说这组方法的名称是什么?有更多方法可以完成同样的工作吗?链接到scaladoc也可以提供帮助.

-D-

akka-stream

19
推荐指数
1
解决办法
4084
查看次数

为什么Akka Streams吞噬了我的异常?

为什么是例外

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

默默地忽略了?我可以看到流在打印后停止Received 1,但没有记录任何内容.请注意,问题通常不是日志记录配置,因为如果我akka.log-config-on-start = onapplication.conf文件中设置,我会看到很多输出.

scala exception-handling akka-stream

18
推荐指数
2
解决办法
5963
查看次数

如何在PlayFramework中使用Akka Streams SourceQueue

我想使用SourceQueue将元素动态推送到Akka Stream源.Play控制器需要Source才能使用该chuncked方法传输结果.
由于Play使用自己的Akka Stream Sink,我无法使用Sink实现源队列,因为在方法使用之前源将被消耗chunked(除非我使用下面的hack).

如果我使用反应流发布器预先实现源队列,我能够使它工作,但它是一种"肮脏的黑客":

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }
Run Code Online (Sandbox Code Playgroud)

是否有更简单的方法在PlayFramework中使用Akka Streams SourceQueue?

谢谢

scala akka playframework akka-stream

18
推荐指数
1
解决办法
3307
查看次数

Akka Streams:状态流畅

我想使用Akka Streams读取多个大文件来处理每一行.想象一下,每个键都包含一个(identifier -> value).如果找到新的标识符,我想将它及其值保存在数据库中; 否则,如果在处理行流时已找到标识符,我只想保存该值.对于这一点,我认为我需要某种形式的递归状态流量,以保持这种已经在发现标识符Map.我想我会在这个流程中收到一对(newLine, contextWithIdentifiers).

我刚刚开始研究Akka Streams.我想我可以管理自己做无状态处理的东西,但我不知道如何保持contextWithIdentifiers.我很欣赏指向正确方向的任何指示.

scala stateful stream akka akka-stream

16
推荐指数
1
解决办法
5699
查看次数

使用Akka Http转换Slick Streaming数据并发送Chunked Response

目的是从数据库流式传输数据,对此数据块执行一些计算(此计算返回某些案例类的Future),并将此数据作为分块响应发送给用户.目前,我能够流式传输数据并发送响应,而无需执行任何计算.但是,我无法执行此计算,然后流式传输结果.

这是我实施的路线.

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}
Run Code Online (Sandbox Code Playgroud)

函数getRds返回映射到case类的表的行(使用slick).现在考虑函数compute,它将每行作为输入并返回另一个案例类的Future.就像是

def compute(x: Tweet) : Future[TweetNew] = ?
Run Code Online (Sandbox Code Playgroud)

如何在变量src上实现此函数,并将此计算的分块响应(作为流)发送给用户.

scala akka slick akka-stream akka-http

15
推荐指数
1
解决办法
474
查看次数

图表创建错误:要求失败:入口[]和出口[]必须对应于入口[in]和出口[out]

我正在使用akka streams graphDSL来创建一个可运行的图.流组件的入口/出口没有编译时错误.运行时抛出以下错误:

任何想法我应该验证什么才能让它运行?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out]
at scala.Predef$.require(Predef.scala:219)
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168)
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390)
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813)
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109)
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65)
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39)
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13)
Run Code Online (Sandbox Code Playgroud)

图结构:

source ~> flowRate ~> render ~> platformPartition.in
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0)
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1)
merger.out ~> evtCreator ~> Sink.ignore
Run Code Online (Sandbox Code Playgroud)

scala typesafe reactive-streams akka-stream

14
推荐指数
1
解决办法
1269
查看次数

Akka Streams:Mat代表什么来源[out,Mat]

在Akka流中,Mat [Out,Mat]或Sink [In,Mat]中的Mat代表什么.它什么时候才会被使用?

akka reactive-streams akka-stream

14
推荐指数
1
解决办法
1991
查看次数