Nag*_*han 5 java google-cloud-platform google-cloud-dataflow apache-beam
我想跳过 CSV 文件中的标题行。截至目前,我正在手动删除标头,然后将其加载到谷歌存储。
下面是我的代码:
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));
Run Code Online (Sandbox Code Playgroud)
我已经检查了 stackoverflow 中的现有问题,但我认为它没有希望:Skipping header rows - is it possible with Cloud DataFlow?
有什么帮助吗?
编辑:我尝试过类似下面的方法并且它有效:
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr2 = c.element().split(",");
String header = Arrays.toString(strArr2);
ClassFinance fin = new ClassFinance();
if(header.contains("Beneficiary"))
System.out.println("Header");
else {
fin.setBeneficiaryFinance(strArr2[0].trim());
fin.setCatlibCode(strArr2[1].trim());
fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
c.output(fin);
}
}
}));
Run Code Online (Sandbox Code Playgroud)
您共享的较旧的 Stack Overflow 帖子(跳过标题行 - 是否可以使用 Cloud DataFlow?)确实包含您问题的答案。
\n\n尽管 Apache Beam JIRA 问题跟踪器BEAM-123中有一个开放的功能请求,但该选项当前在 Apache Beam SDK 中不可用。请注意,截至撰写本文时,此功能请求仍处于开放状态且尚未解决,并且这种情况已经持续了 2 年。然而,看起来在这个意义上正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您保持该 JIRA 问题的更新,因为它最后移至该组件,并且它那里可能会受到更多关注。sdk-java-core
考虑到这些信息,我想说您正在使用的方法(在将文件上传到 GCS 之前删除标头)是您的最佳选择。我不会手动执行此操作,因为您可以轻松编写脚本并自动执行删除标头\xe2\x9f\xb6上传文件过程。
\n\n编辑:
\n\n我已经能够使用DoFn. 它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,并且您可以根据您的需求进行调整。它要求您事先知道正在上传的 CSV 文件的标题(因为它将按元素内容进行过滤),但同样,将此作为您可以根据需要进行修改的模板:
public class RemoveCSVHeader {\n // The Filter class\n static class FilterCSVHeaderFn extends DoFn<String, String> {\n String headerFilter;\n\n public FilterCSVHeaderFn(String headerFilter) {\n this.headerFilter = headerFilter;\n }\n\n @ProcessElement\n public void processElement(ProcessContext c) {\n String row = c.element();\n // Filter out elements that match the header\n if (!row.equals(this.headerFilter)) {\n c.output(row);\n }\n }\n }\n\n // The main class\n public static void main(String[] args) throws IOException {\n PipelineOptions options = PipelineOptionsFactory.create();\n Pipeline p = Pipeline.create(options);\n\n PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));\n\n String header = "col1,col2,col3,col4";\n\n vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))\n .apply(TextIO.write().to("out"));\n\n p.run().waitUntilFinish();\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n