Apache Beam / Dataflow 中跨多个文件的管道代码

Mat*_*rro 6 python google-cloud-dataflow apache-beam

经过长时间的搜索,我没有找到跨多个文件的 Dataflow / Beam 管道的示例。Beam 文档确实建议了一种文件结构(在“多文件依赖项”部分下),但他们给出的 Juliaset 示例实际上具有单个代码/源文件(以及调用它的主文件)。基于 Juliaset 示例,我需要类似的文件结构:

juliaset/__init__.py
juliaset/juliaset.py # actual code
juliaset/some_conf.py
__init__.py
juliaset_main.py
setup.py
Run Code Online (Sandbox Code Playgroud)

现在我想要import .some_conffrom juliaset/juliaset.py,它在本地运行时有效,但在 Dataflow 上运行时出现错误

INFO:root:2017-12-15T17:34:09.333Z: JOB_MESSAGE_ERROR: (8cdf3e226105b90a): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 706, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 446, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 363, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1133, in load_reduce
    value = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named package_name.juliaset.some_conf
Run Code Online (Sandbox Code Playgroud)

一个完整的工作示例将非常感激!

Mar*_*Dam 2

您可以验证您setup.py是否包含以下结构:

import setuptools

setuptools.setup(
    name='My Project',
    version='1.0',
    install_requires=[],
    packages=setuptools.find_packages(),
)
Run Code Online (Sandbox Code Playgroud)

导入您的模块,例如from juliaset.juliaset import SomeClass

当你调用Python脚本时,使用python -m juliaset_main(不带.py)

不确定您是否已经尝试过此操作,但只是为了确定一下。

  • 您需要告诉它通过“--setup_file”管道参数使用安装文件。 (2认同)