小编Cem*_*ser的帖子

Akka-Stream实现比单线程实现慢

2015-10-30更新


基于Roland Kuhn Awnser:

Akka Streams在Actors之间使用异步消息传递来实现流处理阶段.在异步边界上传递数据有一个开销,你在这里看到:你的计算似乎只需要大约160ns(来自单线程测量),而流式解决方案每个元素需要大约1μs,这是由消息传递决定的.

另一个误解是说"流"意味着并行性:在你的代码中,所有计算都在一个Actor(映射阶段)中顺序运行,因此对原始单线程解决方案没有任何好处.

为了从Akka Streams提供的并行性中受益,您需要具有多个处理阶段,每个阶段都执行任务

每个元素1μs,另见文档.

我做了一些改变.我的代码现在看起来像:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream

10
推荐指数
2
解决办法
2473
查看次数

通过从Graph创建Source,Akka-stream UnsupportedOperationException

我正在尝试使用*subFlows连接流.因此,我从广播的出口建立了一个来源.但它抛出了一个UnsupportedOperationException: cannot replace the shape of the EmptyModule.我试图谷歌这个例外,但我找不到类似的东西.

在这里我的代码

val aggFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    val broadcast = builder.add(Broadcast[MonitoringMetricEvent](2))
    val bc = builder.add(Broadcast[Long](1))

    val zip = builder.add(ZipWith[StreamMeasurement, Long, (StreamMeasurement, Long)]((value, ewma) => (value, ewma)))
    val merge = builder.add(Merge[Seq[StreamMeasurement]](1))


    broadcast.out(1) ~> identityFlow ~> maxFlow ~> bc

    val source = Source.fromGraph(GraphDSL.create() { implicit bl =>
      SourceShape(bc.out(0))
    })

    broadcast.out(0) ~> identityFlow ~> topicFlow.groupBy(MAX_SUB_STREAMS, _._1)
        .map(_._2)
        .zip[Long](source)
        .takeWhile(deciderFunction)
        .map(_._1)
        .fold[Seq[StreamMeasurement]](Seq.empty[StreamMeasurement])((seq, sm) => seq:+sm)
      .mergeSubstreams ~>  merge



    FlowShape(broadcast.in, merge.out)
  })
Run Code Online (Sandbox Code Playgroud)

在这里得到的例外情况: …

scala stream-processing akka akka-stream

5
推荐指数
0
解决办法
182
查看次数

HashiCorp Vault填充kubernetes秘密

最近,我了解了HashiCorp Vault及其结合Kubernetes的用法。我发现了两个非常棒的博客文章,其中介绍了如何使用init容器和共享卷(post1post2)使用HashiCorp Vault快速生成凭据。Kubernetes还提供了一种使用Kubernetes机密处理凭据的好方法,它还使人们能够通过环境变量读取凭据。因此,它为秘密存储提供了很好的抽象。

我的问题是,还可以使用HashiCorp Vault 填充凭据填充Kubernetes Secrets,如何实现?

kubernetes hashicorp-vault kubernetes-secrets

4
推荐指数
1
解决办法
1202
查看次数

Akka-Streams收集数据(来源 - >流量 - >流量(收集) - >接收器)

我是斯卡拉和阿卡的全新人物.我有一个简单的RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach
Run Code Online (Sandbox Code Playgroud)

现在我想要这样的东西:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
Run Code Online (Sandbox Code Playgroud)

但Flow2应该等到Flow1中的100个元素可用,然后将这100个元素转换为新元素(需要Flow1中的所有100个元素)并将此新元素提供给Sink.

我做了一些研究并找到了明确的用户定义缓冲区,但我不明白如何从flow2中的flow1访问所有100个元素并使用它们进行一些转换.有人可以解释一下吗?或者甚至更好地发布一个简单的小例子?或两者?

scala akka-stream

3
推荐指数
1
解决办法
2923
查看次数