Google Cloud Dataflow 从字典写入 CSV

ree*_*106 5 python google-cloud-dataflow apache-beam

我有一个值字典,我想使用 Python SDK 将其作为有效的 .CSV 文件写入 GCS。我可以将字典写为换行符分隔的文本文件,但我似乎找不到将字典转换为有效的 .CSV 的示例。有人可以建议在数据流管道中生成 csv 的最佳方法吗?这个问题的答案解决了从 CSV 文件中读取的问题,但并没有真正解决写入 CSV 文件的问题。我认识到 CSV 文件只是带有规则的文本文件,但我仍然在努力将数据字典转换为可以使用 WriteToText 写入的 CSV。

这是一个简单的示例字典,我想将其转换为 CSV:

test_input = [{'label': 1, 'text': 'Here is a sentence'},
              {'label': 2, 'text': 'Another sentence goes here'}]


test_input  | beam.io.WriteToText(path_to_gcs)
Run Code Online (Sandbox Code Playgroud)

上面的代码将生成一个文本文件,其中每个字典都位于换行符上。Apache Beam 中是否有我可以利用的功能(类似于csv.DictWriter)?

And*_* Mo 5

通常,您需要编写一个函数来将原始dict数据元素转换为 csv 格式的string表示形式。

该函数可以编写为DoFn可应用于PCollection数据束的函数,它将每个集合元素转换为所需的格式;您可以通过将 应用于DoFn您的PCollectionvia来做到这一点ParDo。您还可以将其包装DoFn在更加用户友好的PTransform.

您可以在Beam 编程指南中了解有关此过程的更多信息

这是一个简单的、可翻译的非 Beam 示例:

# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
             {'label': 2, 'text': 'Another sentence goes here'}]

def convert_my_dict_to_csv_record(input_dict):
    """ Turns dictionary values into a comma-separated value formatted string """
    return ','.join(map(str, input_dict.values()))

# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]
Run Code Online (Sandbox Code Playgroud)

converted_test_input如下所示:

['Here is a sentence,1', 'Another sentence goes here,2']
Run Code Online (Sandbox Code Playgroud)

Beam DictToCSV DoFn 和 PTransform 使用示例DictWriter

from csv import DictWriter
from csv import excel
from cStringIO import StringIO

...

def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
    """ Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
        Note: This implementation does not support unicode
    """

    buf = StringIO()

    writer = DictWriter(buf,
                        fieldnames=column_order,
                        restval=missing_val,
                        extrasaction=('ignore' if discard_extras else 'raise'),
                        dialect=dialect)
    writer.writerow(element)

    return buf.getvalue().rstrip(dialect.lineterminator)


class _DictToCSVFn(DoFn):
    """ Converts a Dictionary to a CSV-formatted String

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in the input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def process(self, element, *args, **kwargs):
        result = _dict_to_csv(element,
                              column_order=self._column_order,
                              missing_val=self._missing_val,
                              discard_extras=self._discard_extras,
                              dialect=self._dialect)

        return [result,]

class DictToCSV(PTransform):
    """ Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in an input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def expand(self, pcoll):
        return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
                                          missing_val=self._missing_val,
                                          discard_extras=self._discard_extras,
                                          dialect=self._dialect)
                             )
Run Code Online (Sandbox Code Playgroud)

要使用该示例,您可以将您放入test_inputa 中PCollection,并将DictToCSV PTransform应用于PCollection; 您可以将转换后的结果PCollection用作 的输入WriteToTextcolumn_order请注意,您必须通过参数提供与字典输入元素的键相对应的列名称列表或元组;生成的 CSV 格式字符串列将按照提供的列名称的顺序排列。此外,该示例的底层实现不支持unicode.