使用 python 的 GCP 数据流。“AttributeError:无法在模块‘dataflow_worker.start’上获取属性‘_JsonSink’

han*_*hih 3 python json google-cloud-platform google-cloud-dataflow

我是 GCP 数据流的新手。

我尝试从 GCP 云存储中将文本文件(一行 JSON 字符串)读取为 JSON 格式,然后根据特定字段的值将其拆分并输出到 GCP 云存储(作为 JSON 字符串文本文件)。

这是我的代码

但是,我在 GCP 数据流上遇到了一些错误:

Traceback (most recent call last):
  File "main.py", line 169, in <module>
    run()
  File "main.py", line 163, in run
    shard_name_template='')
  File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\pipeline.py", line 426, in __exit__
    self.run().wait_until_finish()
  File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 1346, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 773, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 410, in load_session
    module = unpickler.load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 474, in find_class
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_JsonSink' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py'>
Run Code Online (Sandbox Code Playgroud)

我可以在本地运行这个脚本,但是当我尝试使用时它失败了 dataflowRunner

请给我一些建议。

附注。apache-beam 版本:2.15.0

[更新1]

我试试@Yueyang Qiu 建议,添加

pipeline_options.view_as(SetupOptions).save_main_session = True
Run Code Online (Sandbox Code Playgroud)

提供的链接说:

此工作流中的 DoFn 依赖于全局上下文(例如,在模块级别导入的模块)

链接支持上述建议。

然而,同样的错误发生了。

所以,我在想我的 _JsonSink 实现(从 filebasedsink.FileBasedSink 继承)是错误的还是需要添加其他东西。

任何意见将不胜感激,谢谢大家!

Val*_*tyn 9

您遇到了一个已知问题,目前(截至 2.17.0 版本),Beam 不支持super()在 Python 3 上的主模块中调用。请查看BEAM-6158 中的可能解决方案。在解决 BEAM-6158 之前,Udi 的答案是解决此问题的好方法,这样您就不必在 Python 2 上运行管道。


小智 6

使用此处的指南,我设法让您的示例运行。

目录结构:

./setup.py
./dataflow_json
./dataflow_json/dataflow_json.py  (no change from your example)
./dataflow_json/__init__.py  (empty file)
./main.py
Run Code Online (Sandbox Code Playgroud)

设置.py:

import setuptools

setuptools.setup(
  name='dataflow_json',
  version='1.0',
  install_requires=[],
  packages=setuptools.find_packages(),
)
Run Code Online (Sandbox Code Playgroud)

主要.py:

from __future__ import absolute_import

from dataflow_json import dataflow_json

if __name__ == '__main__':
    dataflow_json.run()
Run Code Online (Sandbox Code Playgroud)

并且您使用python main.py.

基本上发生的事情是该'--setup_file=./setup.py'标志告诉 Beam 创建一个包并将其安装在 Dataflow 远程工作者上。__init__.pysetuptools 需要该文件将dataflow_json/目录标识为包。