我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取.
我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素.
如果我不想下游合并子流,我可以做一些事情groupBy
和Sink.lazyInit
:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
Run Code Online (Sandbox Code Playgroud)
但是,如果treatUser
成为a ,这不起作用Flow
,因为没有相应的Sink.lazyInit
.
由于groupBy
仅在推送新元素时才实现子流,因此应该可以使用此元素来实现子流,但是我无法调整groupBy的源代码以使此工作始终如一.同样,Sink.lazyInit
似乎不容易翻译成Flow
案例.
有关如何解决这个问题的任何想法?
您必须查看的相关 Akka 问题是#20129:添加 Sink.dynamic 和 Flow.dynamic。
在相关的 PR #20579中,他们实际上实现了LazySink
一些东西。
LazyFlow
他们下一步计划做的是:
将使用类似的签名执行下一个lazyFlow。
不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。
归档时间: |
|
查看次数: |
207 次 |
最近记录: |