从Apache Beam中的多个文件夹中读取文件,并将输出映射到文件名

the*_*nse 8 python google-cloud-platform google-cloud-dataflow apache-beam

继续从多个文件夹中读取文件,然后使用python sdk和dataflow runner将文件名如(filecontents,filename)输出到apache beam中的bigquery.

原本以为我可以为每个文件创建一个pcollection,然后使用文件名映射文件内容.

def read_documents(pipeline):
  """Read the documents at the provided uris and returns (uri, line) pairs."""
  pcolls = []
  count = 0
  with open(TESTIN) as uris:
       for uri in uris:
    #print str(uri).strip("[]/'")
         pcolls.append(
         pipeline
         | 'Read: uri' + str(uri)  >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
         | 'WithKey: uri'  + str(uri)   >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri) 
         )
       return pcolls | 'FlattenReadPColls' >> beam.Flatten()
Run Code Online (Sandbox Code Playgroud)

这工作正常,但速度很慢,大约10000个文件后无法在数据流云上工作.如果超过10000个文件,它将遭受破损的管道.

目前正试图从Text.io重载ReadAllFromText函数.Text.io旨在从文件名或模式的pcollection中快速读取大量文件.如果从Google云端存储中读取并且该文件具有内容编码,则此模块中存在错误.谷歌云存储自动枪杀文件并对其进行转码,但由于某些原因,ReadAllFromText无法使用它.您必须更改文件的元数据以删除内容编码,并将ReadAllFromText上的压缩类型设置为gzip.我将此问题包含在内,以防其他人遇到ReadAllFromText问题 https://issues.apache.org/jira/browse/BEAM-1874

我目前的代码看起来像这样

class ReadFromGs(ReadAllFromText):

    def __init__(self):
        super(ReadFromGs, self).__init__(compression_type="gzip")

    def expand(self, pvalue):
        files = self._read_all_files
        return (
            pvalue          
            | 'ReadAllFiles' >> files #self._read_all_files
            | 'Map values' >>  beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
            )
Run Code Online (Sandbox Code Playgroud)

ReadAllFromText包含在Text.io中,并从filebasedsource.py调用ReadAllText并从PTransform继承.

我相信我只是缺少一些简单的遗漏.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py

de1*_*de1 2

正如您所发现的,ReadFromText当前不支持动态文件名,并且您绝对不想为每个 URL 创建单独的步骤。从您的第一句话中,我了解到您希望将文件名和文件内容作为一个项目获取。这意味着您不需要或受益于文件部分的任何流式传输。您可以简单地读取文件内容。就像是:

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems


def read_all_from_url(url):
    with FileSystems.open(url) as f:
        return f.read()


def read_from_urls(pipeline, urls):
    return (
        pipeline
        | beam.Create(urls)
        | 'Read File' >> beam.Map(lambda url: (
            url,
            read_all_from_url(url)
        ))
    )
Run Code Online (Sandbox Code Playgroud)

如果您认为元数据有问题,您可以对其进行自定义。输出将是一个元组(url文件内容)。如果您的文件内容非常大,您可能需要根据您的用例稍微不同的方法。