在函数流的 Scala fs2 库中:
我想明白之间的差别flatMap,flatTap,evalMap和evalTap。它们似乎都执行相同的操作,即流值的转换。
有什么区别以及何时应该使用它们中的每一个?
我想做这样的事情:
def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) =
(stream, stream.map(split)
Run Code Online (Sandbox Code Playgroud)
但这不起作用,因为它从源头“拉”了两次 - 当我同时排出stream和stream.map(split). 我如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于“推送”的模型,这样我就不会拉两次?
我有一个外部(即,我无法更改它)Java API,如下所示:
public interface Sender {
void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)
我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。
使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def …Run Code Online (Sandbox Code Playgroud) 这是fs2文档中的一段代码。该函数go是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?
import fs2._
// import fs2._
def tk[F[_],O](n: Long): Pipe[F,O,O] = {
def go(s: Stream[F,O], n: Long): Pull[F,O,Unit] = {
s.pull.uncons.flatMap {
case Some((hd,tl)) =>
hd.size match {
case m if m <= n => Pull.output(hd) >> go(tl, n - m)
case m => Pull.output(hd.take(n.toInt)) >> Pull.done
}
case None => Pull.done
}
}
in => go(in,n).stream
}
// tk: [F[_], O](n: Long)fs2.Pipe[F,O,O]
Stream(1,2,3,4).through(tk(2)).toList
// res33: List[Int] = List(1, 2)
Run Code Online (Sandbox Code Playgroud)
如果我们go从另一个方法调用,它也是安全的吗?
def tk[F[_],O](n: …Run Code Online (Sandbox Code Playgroud) 我想知道如何将 Scala fs2 Stream 转换为字符串,来自 fs2 github 自述文件示例:
def converter[F[_]](implicit F: Sync[F]): F[Unit] = {
val path = "/Users/lorancechen/version_control_project/_unlimited-works/git-server/src/test/resources"
io.file.readAll[F](Paths.get(s"$path/fs.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(s"$path/fs-output.txt")))
.compile.drain
}
// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)
如何将结果获取到字符串而不是另一个文件?
我有一个 unordered 流measurements,我想将其分组为固定大小的批次,以便以后可以有效地保留它们:
val measurements = for {
id <- Seq("foo", "bar", "baz")
value <- 1 to 5
} yield (id, value)
fs2.Stream.emits(scala.util.Random.shuffle(measurements)).toVector
Run Code Online (Sandbox Code Playgroud)
也就是说,而不是:
(bar,4)
(foo,5)
(baz,3)
(baz,5)
(baz,4)
(foo,2)
(bar,2)
(foo,4)
(baz,1)
(foo,1)
(foo,3)
(bar,1)
(bar,5)
(bar,3)
(baz,2)
Run Code Online (Sandbox Code Playgroud)
对于等于的批量大小,我希望具有以下结构3:
(bar,[4,2,1])
(foo,[5,2,4])
(baz,[3,5,4])
(baz,[1,2])
(foo,[1,3])
(bar,[5,3])
Run Code Online (Sandbox Code Playgroud)
在 FS2 中是否有一种简单、惯用的方法来实现这一点?我知道有一个groupAdjacentBy函数,但这只会考虑相邻的项目。
我现在在0.10.5。
我的事件流如下:
sealed trait Event
val eventStream: fs2.Stream[IO, Event] = //...
Run Code Online (Sandbox Code Playgroud)
我想将在一分钟内(即每分钟0秒到59秒)收到的事件分组。这听起来很简单fs2
val groupedEventsStream = eventStream groupAdjacentBy {event =>
TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}
Run Code Online (Sandbox Code Playgroud)
问题在于分组功能不纯。它使用currentTimeMillis。我可以这样工作:
stream.evalMap(t => IO(System.currentTimeMillis(), t))
.groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))
Run Code Online (Sandbox Code Playgroud)
事实是,我想避免使用元组添加笨拙的样板。还有其他解决方案吗?
或者在这种情况下使用不纯函数还不错吗?
I want to run N nested streams/pipes in parallel and send each element to only one of the nested streams. Balance allows me to do this but I want to route elements with the same "key" to the same nested stream or pipe.
I can't see any functions to do this so I wrote a basic POC which broadcasts each element to every stream. The stream/pipe then filters only the elements it should handle (see below). This seems quite inefficient, …
Monix的作者说将Monix与FS2进行比较
FS2更好的地方:
- 生产者和消费者之间的沟通模型是基于拉动的,有时使实施新运营商变得更加容易
Monix更好的地方:
- 生产者和消费者之间的沟通模型是基于推送的(带有背压),这使其本质上更加有效
很少有问题出现:
我有这样的东西(这是来自https://github.com/typelevel/fs2的一个例子,有我的补充,我用评论标记):
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import java.nio.file.Paths
object Converter extends IOApp {
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.balanceAvailable // my addition
.map ( worker => // my addition
worker // my addition
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
) // my addition
.take(4).parJoinUnbounded // my addition
} …Run Code Online (Sandbox Code Playgroud)