如何使用Reactive Streams进行NIO二进制处理?

Ste*_*hen 9 nio scala akka reactive-streams

是否有一些代码示例使用org.reactivestreams库来处理使用Java NIO的大数据流(为了获得高性能)?我的目标是分布式处理,所以使用Akka的例子是最好的,但我可以解决这个问题.

似乎大多数(我希望不是所有)scala中的文件读取Source(非二进制)或直接Java NIO(甚至是类似的东西)的例子都是这样的Files.readAllBytes.

也许有一个我错过的激活模板?(带有Scala的Akka Streams!除了二进制/ NIO端之外,已经接近我需要的所有内容)

Kon*_*ski 9

不要scala.collection.immutable.Stream用来使用这样的文件,原因是它执行memoization - 也就是说,它是懒惰的,它会将整个流缓存(memoized)保存在内存中!

当您考虑"流处理文件"时,这绝对不是您想要的.Scala的Stream工作原理是因为在功能设置中它完全有意义 - 你可以避免一次又一次地反复计算fibbonachi数字,例如,有关详细信息,请参阅ScalaDoc.

Akka Streams提供了Reactive Streams实现,并提供了一个FileIO可以在这里使用的类(只有在需要时它才会正确地反压并将数据从文件中提取出来,并且其余的流已准备好使用它):

import java.io._
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }

object ExampleApp extends App {


  implicit val sys = ActorSystem()
  implicit val mat = FlowMaterializer()

  FileIO.fromPath(Paths.get("/example/file.txt"))
    .map(c ? { print(c); c })
    .runWith(Sink.onComplete(_ ? { f.close(); sys.shutdown() } ))
}
Run Code Online (Sandbox Code Playgroud)

以下是有关使用Akka Streams 处理IO的更多文档请注意,这是针对Akka 的当前编写版本,因此是2.5.x系列.

希望这可以帮助!


cmb*_*ter 4

我们实际上使用akka流来处理二进制文件。由于没有任何相关文档,让事情进行起来有点棘手,但这就是我们想到的:

val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
val binSource = Source(binStream)
Run Code Online (Sandbox Code Playgroud)

一旦你有了binSource,这是一个 akka ,Source[Byte]你就可以继续并开始应用你想要的任何流转换( mapflatMap, ,等等......)。transform此功能利用带有 的Source伴随对象,传入一个 scala,该 scala 应该延迟读取数据并使其可用于您的转换。applyIterableStream

编辑

正如康拉德在评论部分指出的那样,流可能会成为大文件的问题,因为它在延迟构建流时会对遇到的元素执行记忆化。如果您不小心,这可能会导致内存不足的情况。但是,如果您查看Stream的文档,就会发现有一个避免在内存中建立记忆的提示:

背诵必须谨慎;如果你不小心的话,你很快就会耗尽大量的内存。原因是 Stream 的记忆化创建了一个非常类似于 scala.collection.immutable.List 的结构。只要有东西抓住了头部,头部就会抓住尾部,所以它会继续递归。另一方面,如果没有任何东西固定在头部(例如,我们使用 def 来定义 Stream),那么一旦不再直接使用它,它就会消失。

因此考虑到这一点,您可以修改我原来的示例,如下所示:

val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))     
val binSource = Source(() => binStream(inputStream).iterator)

def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
Run Code Online (Sandbox Code Playgroud)

所以这里的想法是构建Streamvia adef而不是分配给 a val,然后立即从中获取iterator并使用它来初始化 Akka Source。以这种方式设置应该可以避免 momoization 的问题。我针对一个大文件运行旧代码,并能够OutOfMemory通过foreachSource. 当我将其切换到新代码时,我能够避免这个问题。

  • scala.collection.immutable.Stream 的使用在这里相当危险 - 它采用记忆化(!)(请参阅文档http://www.scala-lang.org/api/current/index.html#scala.collection.immutable。 Stream ),因此您最终会将整个文件存储在内存中,而不是通过流式传输(!)。 (2认同)
  • 很好的更新,暴露输入流的迭代器效果很好。请记住在流完成时也关闭资源。 (2认同)