Som*_*ame 6 functional-programming scala stream fs2
我的事件流如下:
sealed trait Event
val eventStream: fs2.Stream[IO, Event] = //...
Run Code Online (Sandbox Code Playgroud)
我想将在一分钟内(即每分钟0秒到59秒)收到的事件分组。这听起来很简单fs2
val groupedEventsStream = eventStream groupAdjacentBy {event =>
TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}
Run Code Online (Sandbox Code Playgroud)
问题在于分组功能不纯。它使用currentTimeMillis。我可以这样工作:
stream.evalMap(t => IO(System.currentTimeMillis(), t))
.groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))
Run Code Online (Sandbox Code Playgroud)
事实是,我想避免使用元组添加笨拙的样板。还有其他解决方案吗?
或者在这种情况下使用不纯函数还不错吗?
您可以使用cats.effect.Clock删除一些样板:
def groupedEventsStream[A](stream: fs2.Stream[IO, A])
(implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
.groupAdjacentBy(_._1)
Run Code Online (Sandbox Code Playgroud)