在 Apache Beam 中读取 CSV 文件时跳过标头

Nag*_*han 5 java google-cloud-platform google-cloud-dataflow apache-beam

我想跳过 CSV 文件中的标题行。截至目前,我正在手动删除标头,然后将其加载到谷歌存储。

下面是我的代码:

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));
Run Code Online (Sandbox Code Playgroud)

我已经检查了 stackoverflow 中的现有问题,但我认为它没有希望:Skipping header rows - is it possible with Cloud DataFlow?

有什么帮助吗?

编辑:我尝试过类似下面的方法并且它有效:

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));
Run Code Online (Sandbox Code Playgroud)

dse*_*sto 6

您共享的较旧的 Stack Overflow 帖子(跳过标题行 - 是否可以使用 Cloud DataFlow?)确实包含您问题的答案。

\n\n

尽管 Apache Beam JIRA 问题跟踪器BEAM-123中有一个开放的功能请求,但该选项当前在 Apache Beam SDK 中不可用。请注意,截至撰写本文时,此功能请求仍处于开放状态且尚未解决,并且这种情况已经持续了 2 年。然而,看起来在这个意义上正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您保持该 JIRA 问题的更新,因为它最后移至该组件,并且它那里可能会受到更多关注。sdk-java-core

\n\n

考虑到这些信息,我想说您正在使用的方法(在将文件上传到 GCS 之前删除标头)是您的最佳选择。我不会手动执行此操作,因为您可以轻松编写脚本并自动执行删除标头\xe2\x9f\xb6上传文件过程。

\n\n
\n\n

编辑:

\n\n

我已经能够使用DoFn. 它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,并且您可以根据您的需求进行调整。它要求您事先知道正在上传的 CSV 文件的标题(因为它将按元素内容进行过滤),但同样,将此作为您可以根据需要进行修改的模板:

\n\n
public class RemoveCSVHeader {\n  // The Filter class\n  static class FilterCSVHeaderFn extends DoFn<String, String> {\n    String headerFilter;\n\n    public FilterCSVHeaderFn(String headerFilter) {\n      this.headerFilter = headerFilter;\n    }\n\n    @ProcessElement\n    public void processElement(ProcessContext c) {\n      String row = c.element();\n      // Filter out elements that match the header\n      if (!row.equals(this.headerFilter)) {\n        c.output(row);\n      }\n    }\n  }\n\n  // The main class\n  public static void main(String[] args) throws IOException {\n    PipelineOptions options = PipelineOptionsFactory.create();\n    Pipeline p = Pipeline.create(options);\n\n    PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));\n\n    String header = "col1,col2,col3,col4";\n\n    vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))\n        .apply(TextIO.write().to("out"));\n\n    p.run().waitUntilFinish();\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n