M.K*_*.K. 11 scala akka akka-stream
我正在读一个csv文件.我正在使用Akka Streams执行此操作,以便我可以创建要在每一行执行的操作的图表.我已经启动并运行了以下玩具示例.
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyAkkaSystem")
implicit val materializer = ActorMaterializer()
val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
val sink = Sink.foreach(println)
source.runWith(sink)
}
Run Code Online (Sandbox Code Playgroud)
这两种Source类型对我来说并不容易.这是惯用的还是有更好的方法来写这个?
小智 17
实际上,akka-streams提供了直接从文件读取的功能.
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.runForeach(println)
Run Code Online (Sandbox Code Playgroud)
这里,runForeach方法是打印线条.如果您有适当Sink的处理这些行,请使用它而不是此函数.例如,如果要分割线条'并打印其中的总字数:
val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.to(sink)
.run()
Run Code Online (Sandbox Code Playgroud)
Jef*_*ung 13
使用Akka Streams读取CSV文件的惯用方法是使用Alpakka CSV连接器.以下示例读取CSV文件,将其转换为列名称的映射(假定为文件中的第一行)和ByteString值,将ByteString值转换为String值,并打印每行:
FileIO.fromPath(Paths.get("a.csv"))
.via(CsvParsing.lineScanner())
.via(CsvToMap.toMap())
.map(_.mapValues(_.utf8String))
.runForeach(println)
Run Code Online (Sandbox Code Playgroud)
尝试这个:
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
object ReadStreamApp extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()
val logFile = Paths.get("src/main/resources/a.csv")
val source = FileIO.fromPath(logFile)
val flow = Framing
.delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
.map(_.utf8String)
val sink = Sink.foreach(println)
source
.via(flow)
.runWith(sink)
.andThen {
case _ =>
actorSystem.terminate()
Await.ready(actorSystem.whenTerminated, 1 minute)
}
}
Run Code Online (Sandbox Code Playgroud)
是的,没关系,因为这些是不同的Source。但是如果您不喜欢,scala.io.Source您可以自己读取文件(有时我们必须这样做,例如源csv文件被压缩),然后使用给定的方法解析它,InputStream如下所示
StreamConverters.fromInputStream(() => input)
.via(Framing.delimiter(ByteString("\n"), 4096))
.map(_.utf8String)
.collect { line =>
line
}
Run Code Online (Sandbox Code Playgroud)
话虽如此,请考虑Apache Commons CSV与 akka-stream 一起使用。您最终可能会编写更少的代码:)
| 归档时间: |
|
| 查看次数: |
8827 次 |
| 最近记录: |