在 python Apache Beam 中打开一个 gzip 文件

ags*_*lid 5 python dataflow google-cloud-dataflow apache-beam

目前是否可以使用 Apache Beam 从 python 中的 gzip 文件中读取?我的管道正在使用以下代码行从 gcs 中提取 gzip 文件:

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 
Run Code Online (Sandbox Code Playgroud)

但我收到此错误:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
Run Code Online (Sandbox Code Playgroud)

我们在 python beam 源代码中注意到,在写入接收器时似乎处理了压缩文件。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

更详细的追溯:

Traceback (most recent call last):
  File "beam-playground.py", line 11, in <module>
    p.run() 
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read
    read_values(reader)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values
    read_result = [GlobalWindows.windowed_value(e) for e in reader]
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__
    yield self.source.coder.decode(line)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode
    return value.decode('utf-8')
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
Run Code Online (Sandbox Code Playgroud)

Ken*_*les 4

更新:TextIOPython SDK 现在支持读取压缩文件。

目前TextIOPython SDK实际上并不支持读取压缩文件。