如何对来自无限流的传入事件进行分组?

Max*_*kov 5 scala akka-stream

我有无数的事件:

(timestamp, session_uid, traffic)
Run Code Online (Sandbox Code Playgroud)

...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...
Run Code Online (Sandbox Code Playgroud)

这些事件我想按session_uid分组,并计算每个会话的流量总和.

我编写了一个akka-streams使用有限流使用的流程groupBy(我的代码基于这个例子来自cookbook).但是对于无限流,它将无法工作,因为groupBy函数应该处理所有传入流,并且只有在此之后才准备返回结果.

我想我应该用超时实现分组,即如果我从最后一次超过5分钟没有收到指定stream_uid的事件,我应该返回此session_uid的分组事件.但是如何实现呢akka-streams

Ram*_*gil 3

我想出了一个有点粗糙的解决方案,但我认为它可以完成工作。

基本思想是使用keepAliveSource 的方法作为触发完成的计时器。

但要做到这一点,我们首先必须对数据进行一些抽象。计时器需要从原始源发送触发器或另一个元组值,因此:

sealed trait Data

object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
Run Code Online (Sandbox Code Playgroud)

然后将我们的元组源转换为值源。我们仍然会使用groupBy类似于有限流情况的分组:

val originalSource : Source[(Long, String, Int), Unit] = ???

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid

val groupedDataSource : Source[IDGroup, Unit] = 
  originalSource.map(t => Value(t._1, t._2, t._3))
                .groupBy(_.session_uid)
Run Code Online (Sandbox Code Playgroud)

棘手的部分是处理只是元组的分组:(String, Source[Value,Unit])。如果时间已过,我们需要计时器来通知我们,因此我们需要另一个抽象来知道我们是否仍在计算或由于超时而已完成计算:

sealed trait Sum {
  val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum

val zeroSum : Sum = StillComputing(0)
Run Code Online (Sandbox Code Playgroud)

现在我们可以汲取每组的源泉了。如果值源在 后没有产生任何内容,则将keepAlive发送。然后,来自keepAlive 的 与 TimerTrigger 或来自原始源的新值进行模式匹配:TimerTriggertimeOutData

val evaluateSum : ((Sum , Data)) => Sum = {
  case (runningSum, data) => { 
    data match {
      case TimerTrigger => ComputedSum(runningSum.sum)
      case v : Value    => StillComputing(runningSum.sum + v.traffic)
    }
  }
}//end val evaluateSum

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
  idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
                          .scan(zeroSum)(evaluateSum)
                          .collect {case c : ComputedSum => c.sum}
                          .runWith(Sink.head)
Run Code Online (Sandbox Code Playgroud)

该集合应用于仅与完成的总和匹配的部分函数,​​因此仅在计时器触发后才到达接收器。

然后我们将此处理程序应用于出现的每个分组:

val timeOut = FiniteDuration(5, MINUTES)

val sumSource : Source[SumResult, Unit] = 
  groupedDataSource map handleGroup(timeOut)
Run Code Online (Sandbox Code Playgroud)

我们现在有一个 Source,(String,Future[Int])其中的 session_uid 和一个 Future,该 id 的流量总和。

就像我说的,虽然很复杂,但满足要求。另外,我不完全确定如果一个 uid 已经分组并且已经超时,但是随后出现具有相同 uid 的新值,会发生什么情况。