fr3*_*3ak 1 scala akka akka-stream
我正在尝试使用 alpakka 和 scala 流从 ftp 服务器读取文件。我从 中得到的类型Ftp.fromPath(...)是Source[ByteString, Future[IOResult]]. 我想逐行读取文件(它是一个 CSV 文件),但我不知道如何。
我将不胜感激任何帮助。
有一种Source[ByteString, _]按行拆分 a 的标准方法,称为Framing.delimiter。它可以像这样使用:
val source: Source[ByteString, Future[IOResult]] = Ftp.fromPath(...)
val splitter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 1024,
allowTruncation = true
)
val result: Source[ByteString, Future[IOResult]] = source.via(splitter)
Run Code Online (Sandbox Code Playgroud)
该maximumFrameLength参数决定了一行的最大长度;您可以将其设置Int.MaxValue为获得基本上不受限制的行长(尽管如果您的 CSV 行很长可能会很危险),并allowTruncation设置true为允许在 CSV 文件末尾没有新行的情况。
的result源,物化时,会产生ByteString对应于每个线S,而不会在其中的换行字符。如果您希望您的文件包含 Windows 行分隔符 ("\r\n"),那么您需要手动修剪这些字符串。