来自 ftp 的 Akka 流,逐行

fr3*_*3ak 1 scala akka akka-stream

我正在尝试使用 alpakka 和 scala 流从 ftp 服务器读取文件。我从 中得到的类型Ftp.fromPath(...)Source[ByteString, Future[IOResult]]. 我想逐行读取文件(它是一个 CSV 文件),但我不知道如何。

我将不胜感激任何帮助。

Vla*_*eev 5

有一种Source[ByteString, _]按行拆分 a 的标准方法,称为Framing.delimiter。它可以像这样使用:

val source: Source[ByteString, Future[IOResult]] = Ftp.fromPath(...)

val splitter = Framing.delimiter(
  ByteString("\n"),
  maximumFrameLength = 1024,
  allowTruncation = true
)

val result: Source[ByteString, Future[IOResult]] = source.via(splitter)
Run Code Online (Sandbox Code Playgroud)

maximumFrameLength参数决定了一行的最大长度;您可以将其设置Int.MaxValue为获得基本上不受限制的行长(尽管如果您的 CSV 行很长可能会很危险),并allowTruncation设置true为允许在 CSV 文件末尾没有新行的情况。

result源,物化时,会产生ByteString对应于每个线S,而不会在其中的换行字符。如果您希望您的文件包含 Windows 行分隔符 ("\r\n"),那么您需要手动修剪这些字符串。