Ste*_*hen 9 nio scala akka reactive-streams
是否有一些代码示例使用org.reactivestreams库来处理使用Java NIO的大数据流(为了获得高性能)?我的目标是分布式处理,所以使用Akka的例子是最好的,但我可以解决这个问题.
似乎大多数(我希望不是所有)scala中的文件读取Source
(非二进制)或直接Java NIO(甚至是类似的东西)的例子都是这样的Files.readAllBytes
.
也许有一个我错过的激活模板?(带有Scala的Akka Streams!除了二进制/ NIO端之外,已经接近我需要的所有内容)
不要scala.collection.immutable.Stream
用来使用这样的文件,原因是它执行memoization - 也就是说,它是懒惰的,它会将整个流缓存(memoized)保存在内存中!
当您考虑"流处理文件"时,这绝对不是您想要的.Scala的Stream工作原理是因为在功能设置中它完全有意义 - 你可以避免一次又一次地反复计算fibbonachi数字,例如,有关详细信息,请参阅ScalaDoc.
Akka Streams提供了Reactive Streams实现,并提供了一个FileIO
可以在这里使用的类(只有在需要时它才会正确地反压并将数据从文件中提取出来,并且其余的流已准备好使用它):
import java.io._
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }
object ExampleApp extends App {
implicit val sys = ActorSystem()
implicit val mat = FlowMaterializer()
FileIO.fromPath(Paths.get("/example/file.txt"))
.map(c ? { print(c); c })
.runWith(Sink.onComplete(_ ? { f.close(); sys.shutdown() } ))
}
Run Code Online (Sandbox Code Playgroud)
以下是有关使用Akka Streams 处理IO的更多文档请注意,这是针对Akka 的当前编写版本,因此是2.5.x系列.
希望这可以帮助!
我们实际上使用akka流来处理二进制文件。由于没有任何相关文档,让事情进行起来有点棘手,但这就是我们想到的:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte)
val binSource = Source(binStream)
Run Code Online (Sandbox Code Playgroud)
一旦你有了binSource
,这是一个 akka ,Source[Byte]
你就可以继续并开始应用你想要的任何流转换( map
,flatMap
, ,等等......)。transform
此功能利用带有 的Source
伴随对象,传入一个 scala,该 scala 应该延迟读取数据并使其可用于您的转换。apply
Iterable
Stream
编辑
正如康拉德在评论部分指出的那样,流可能会成为大文件的问题,因为它在延迟构建流时会对遇到的元素执行记忆化。如果您不小心,这可能会导致内存不足的情况。但是,如果您查看Stream的文档,就会发现有一个避免在内存中建立记忆的提示:
背诵必须谨慎;如果你不小心的话,你很快就会耗尽大量的内存。原因是 Stream 的记忆化创建了一个非常类似于 scala.collection.immutable.List 的结构。只要有东西抓住了头部,头部就会抓住尾部,所以它会继续递归。另一方面,如果没有任何东西固定在头部(例如,我们使用 def 来定义 Stream),那么一旦不再直接使用它,它就会消失。
因此考虑到这一点,您可以修改我原来的示例,如下所示:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binSource = Source(() => binStream(inputStream).iterator)
def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
Run Code Online (Sandbox Code Playgroud)
所以这里的想法是构建Stream
via adef
而不是分配给 a val
,然后立即从中获取iterator
并使用它来初始化 Akka Source
。以这种方式设置应该可以避免 momoization 的问题。我针对一个大文件运行旧代码,并能够OutOfMemory
通过foreach
对Source
. 当我将其切换到新代码时,我能够避免这个问题。