继续从多个文件夹中读取文件,然后使用python sdk和dataflow runner将文件名如(filecontents,filename)输出到apache beam中的bigquery.
原本以为我可以为每个文件创建一个pcollection,然后使用文件名映射文件内容.
def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
Run Code Online (Sandbox Code Playgroud)
这工作正常,但速度很慢,大约10000个文件后无法在数据流云上工作.如果超过10000个文件,它将遭受破损的管道.
目前正试图从Text.io重载ReadAllFromText函数.Text.io旨在从文件名或模式的pcollection中快速读取大量文件.如果从Google云端存储中读取并且该文件具有内容编码,则此模块中存在错误.谷歌云存储自动枪杀文件并对其进行转码,但由于某些原因,ReadAllFromText无法使用它.您必须更改文件的元数据以删除内容编码,并将ReadAllFromText上的压缩类型设置为gzip.我将此问题包含在内,以防其他人遇到ReadAllFromText问题 https://issues.apache.org/jira/browse/BEAM-1874
我目前的代码看起来像这样
class ReadFromGs(ReadAllFromText):
def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")
def expand(self, pvalue):
files = …Run Code Online (Sandbox Code Playgroud) python google-cloud-platform google-cloud-dataflow apache-beam