我有这样的东西(这是来自https://github.com/typelevel/fs2的一个例子,有我的补充,我用评论标记):
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import java.nio.file.Paths
object Converter extends IOApp {
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.balanceAvailable // my addition
.map ( worker => // my addition
worker // my addition
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
) // my addition
.take(4).parJoinUnbounded // my addition
} …Run Code Online (Sandbox Code Playgroud) 我刚刚开始我的 fs2 流冒险。我想要实现的是读取一个文件(一个大文件,这就是我使用 fs2 的原因),转换它并将结果写入两个不同的文件(基于某些谓词)。一些代码(来自https://github.com/typelevel/fs2),以及我的评论:
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
/* instead of the last line I want something like this:
.through(<write temperatures higher than 10 to one file, the rest to the other one>)
*/
}
Run Code Online (Sandbox Code Playgroud)
最有效的方法是什么?显而易见的解决方案是使用两个具有不同过滤器的流,但效率很低(将有两次通过)。