han*_*hih 3 python json google-cloud-platform google-cloud-dataflow
我是 GCP 数据流的新手。
我尝试从 GCP 云存储中将文本文件(一行 JSON 字符串)读取为 JSON 格式,然后根据特定字段的值将其拆分并输出到 GCP 云存储(作为 JSON 字符串文本文件)。
这是我的代码
但是,我在 GCP 数据流上遇到了一些错误:
Traceback (most recent call last):
File "main.py", line 169, in <module>
run()
File "main.py", line 163, in run
shard_name_template='')
File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\pipeline.py", line 426, in __exit__
self.run().wait_until_finish()
File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 1346, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 773, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 410, in load_session
module = unpickler.load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 474, in find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_JsonSink' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py'>
Run Code Online (Sandbox Code Playgroud)
我可以在本地运行这个脚本,但是当我尝试使用时它失败了 dataflowRunner
请给我一些建议。
附注。apache-beam 版本:2.15.0
[更新1]
我试试@Yueyang Qiu 建议,添加
pipeline_options.view_as(SetupOptions).save_main_session = True
Run Code Online (Sandbox Code Playgroud)
提供的链接说:
此工作流中的 DoFn 依赖于全局上下文(例如,在模块级别导入的模块)
此链接支持上述建议。
然而,同样的错误发生了。
所以,我在想我的 _JsonSink 实现(从 filebasedsink.FileBasedSink 继承)是错误的还是需要添加其他东西。
任何意见将不胜感激,谢谢大家!
您遇到了一个已知问题,目前(截至 2.17.0 版本),Beam 不支持super()在 Python 3 上的主模块中调用。请查看BEAM-6158 中的可能解决方案。在解决 BEAM-6158 之前,Udi 的答案是解决此问题的好方法,这样您就不必在 Python 2 上运行管道。
小智 6
使用此处的指南,我设法让您的示例运行。
目录结构:
./setup.py
./dataflow_json
./dataflow_json/dataflow_json.py (no change from your example)
./dataflow_json/__init__.py (empty file)
./main.py
Run Code Online (Sandbox Code Playgroud)
设置.py:
import setuptools
setuptools.setup(
name='dataflow_json',
version='1.0',
install_requires=[],
packages=setuptools.find_packages(),
)
Run Code Online (Sandbox Code Playgroud)
主要.py:
from __future__ import absolute_import
from dataflow_json import dataflow_json
if __name__ == '__main__':
dataflow_json.run()
Run Code Online (Sandbox Code Playgroud)
并且您使用python main.py.
基本上发生的事情是该'--setup_file=./setup.py'标志告诉 Beam 创建一个包并将其安装在 Dataflow 远程工作者上。__init__.pysetuptools 需要该文件将dataflow_json/目录标识为包。
小智 0
您可以尝试save_main_session = True如下设置选项: https: //github.com/apache/beam/blob/a2b0ad14f1525d1a645cb26f5b8ec45692d9d54e/sdks/python/apache_beam/examples/cookbook/coders.py#L88。
| 归档时间: |
|
| 查看次数: |
1797 次 |
| 最近记录: |