使用Dataflow读取CSV标头

Max*_*ian 6 google-cloud-dataflow apache-beam

我有一个CSV文件,我不知道列名提前.我需要在Google Dataflow中进行一些转换后以JSON格式输出数据.

采用标题行并将标签渗透到所有行的最佳方法是什么?

例如:

a,b,c
1,2,3
4,5,6
Run Code Online (Sandbox Code Playgroud)

......变得(大约):

{a:1, b:2, c:3}
{a:4, b:5, c:6}
Run Code Online (Sandbox Code Playgroud)

rob*_*oul 8

您应该实现自定义FileBasedSource(类似于TextIO.TextSource),它将读取第一行并存储头数据

    @Override
    protected void startReading(final ReadableByteChannel channel)
    throws IOException {
        lineReader = new LineReader(channel);

        if (lineReader.readNextLine()) {
            final String headerLine = lineReader.getCurrent().trim();
            header = headerLine.split(",");
            readingStarted = true;
        }
    }
Run Code Online (Sandbox Code Playgroud)

而后者,在读取其他行时会将其添加到当前行数据中:

    @Override
    protected boolean readNextRecord() throws IOException {
        if (!lineReader.readNextLine()) {
            return false;
        }

        final String line = lineReader.getCurrent();
        final String[] data = line.split(",");

        // assumes all lines are valid
        final StringBuilder record = new StringBuilder();
        for (int i = 0; i < header.length; i++) {
            record.append(header[i]).append(":").append(data[i]).append(", ");
        }

        currentRecord = record.toString();
        return true;
    }
Run Code Online (Sandbox Code Playgroud)

我已经实现了一个快速(完整)的解决方案,可以在github上找到.我还添加了一个数据流单元测试来演示阅读:

@Test
public void test_reading() throws Exception {
    final File file =
            new File(getClass().getResource("/sample.csv").toURI());
    assertThat(file.exists()).isTrue();

    final Pipeline pipeline = TestPipeline.create();

    final PCollection<String> output =
            pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));

    DataflowAssert
            .that(output)
            .containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");

    pipeline.run();
}
Run Code Online (Sandbox Code Playgroud)

哪里sample.csv有以下内容:

a,b,c
1,2,3
4,5,6
Run Code Online (Sandbox Code Playgroud)