groupBy的子流是否可以依赖于它们生成的密钥?

Cyr*_*pet 6 scala akka-stream

我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取.

我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素.

如果我不想下游合并子流,我可以做一些事情groupBySink.lazyInit:

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)
Run Code Online (Sandbox Code Playgroud)

但是,如果treatUser成为a ,这不起作用Flow,因为没有相应的Sink.lazyInit.

由于groupBy仅在推送新元素时才实现子流,因此应该可以使用此元素来实现子流,但是我无法调整groupBy的源代码以使此工作始终如一.同样,Sink.lazyInit似乎不容易翻译成Flow案例.

有关如何解决这个问题的任何想法?

Fed*_*tta 2

您必须查看的相关 Akka 问题是#20129:添加 Sink.dynamic 和 Flow.dynamic

在相关的 PR #20579中,他们实际上实现了LazySink一些东西。

LazyFlow他们下一步计划做的是:

将使用类似的签名执行下一个lazyFlow。

不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。