在 Cloud Dataflow 中进行 ETL 和解析 CSV 文件

chi*_*sby 3 csv google-cloud-dataflow

我是云数据流和 Java 的新手,所以我希望这是正确的问题。

我有一个 csv 文件,其中有 n 个列和行,可以是字符串、整数或时间戳。我需要为每一列创建一个新的 PCollection 吗?

我在示例中找到的大多数文档都类似于:

PCollection<String> data = p.apply(TextIO.Read.from("gs://abc/def.csv"));
Run Code Online (Sandbox Code Playgroud)

但对我来说,将整个 csv 文件作为字符串导入是没有意义的。我在这里缺少什么?我应该如何设置我的 PCollections?

xia*_*lin 5

line.split(",");
Run Code Online (Sandbox Code Playgroud)

如果行数据如下所示,则 String.split 没有意义:

a,b,c,"我们有一个包含逗号的字符串",d,e

处理 csv 数据的一种属性方法是导入 csv 库:

        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>3.7</version>
        </dependency>
Run Code Online (Sandbox Code Playgroud)

并在 ParDo 中使用以下代码:

public void processElement(ProcessContext c) throws IOException {
    String line = c.element();
    CSVParser csvParser = new CSVParser();
    String[] parts = csvParser.parseLine(line);
}
Run Code Online (Sandbox Code Playgroud)