Python/Apache-Beam:如何将文本文件解析为 CSV?

Ril*_*Hun 1 python google-cloud-dataflow apache-beam

我还是 Beam 的新手,但是您究竟如何从 GCS 存储桶中的 CSV 文件中读取数据?我基本上使用 Beam 将这些文件转换为 Pandas 数据帧,然后应用 sklearn 模型来“训练”这些数据。我见过的大多数示例都预先定义了标题,我希望这个 Beam 管道可以推广到标题肯定不同的任何文件。有一个名为beam_utils的库可以完成我想做的事情,但后来我遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'

代码示例:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# The error occurs in this import
from beam_utils.sources import CsvFileSource

options = {
    'project': 'my-project',
    'runner:': 'DirectRunner',
    'streaming': False
}

pipeline_options = PipelineOptions(flags=[], **options)

class Printer(beam.DoFn):
    def process(self, element):
        print(element)

with beam.Pipeline(options=pipeline_options) as p:  # Create the Pipeline with the specified options.

    data = (p
            | 'Read File From GCS' >> beam.io.textio.ReadFromText('gs://my-csv-files')
            )

    _ = (data | "Print the data" >> beam.ParDo(Printer()))

result = p.run()
result.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

Tla*_*zal 5

Apache Beam 模块fileio最近进行了向后不兼容的更改,并且该库beam_utils尚未更新。

我通过@Pablo 提出的问题beam_utils(也由 Pablo 编写)的源代码来复制使用filesystems模块的行为。

下面是使用 Pandas 生成 DataFrame 的两个版本的代码。

用于示例的 csv:

a,b
1,2
3,4
5,6
Run Code Online (Sandbox Code Playgroud)

读取 csv 并创建包含所有内容的 DataFrame

a,b
1,2
3,4
5,6
Run Code Online (Sandbox Code Playgroud)

结果数据帧

   a  b
0  1  2
1  3  4
2  5  6
Run Code Online (Sandbox Code Playgroud)

读取 csv 并在其他转换中创建数据帧

import apache_beam as beam
import pandas as pd
import csv
import io

def create_dataframe(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read it as csv, you can also use csv.reader
    csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

    # Create the DataFrame
    dataFrame = pd.DataFrame(csv_dict)
    print(dataFrame.to_string())

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
   | beam.FlatMap(create_dataframe)
)

p.run()
Run Code Online (Sandbox Code Playgroud)

结果数据帧

   a  b
0  1  2
   a  b
0  3  4
   a  b
0  5  6
Run Code Online (Sandbox Code Playgroud)