WIT*_*WIT 7 python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator
当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
Composer 中可用的最新 Airflow 版本是 1.10.2 或 1.10.3(取决于地区)。到那时,这些操作员都在该contrib
部分。
关注如何使用 Composer 运行 Python 3 Dataflow 作业,您需要发布新版本。但是,如果您需要即时解决方案,您可以尝试向后移植此修复程序。
在这种情况下,我定义了一个DataFlow3Hook
扩展法线DataFlowHook
但它没有python2
在start_python_dataflow
方法中硬编码:
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
...
py_interpreter: str = "python3"
):
...
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
Run Code Online (Sandbox Code Playgroud)
然后我们将自定义DataFlowPython3Operator
调用新钩子:
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
...
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
...
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
Run Code Online (Sandbox Code Playgroud)
最后,在我们的 DAG 中,我们只使用 new 运算符:
task = DataFlowPython3Operator(
py_file='/home/airflow/gcs/data/main.py',
task_id=JOB_NAME,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
在此处查看完整代码。作业使用 Python 3.6 运行:
使用的环境详细信息和依赖项(Beam 作业是一个最小示例):
softwareConfig:
imageVersion: composer-1.8.0-airflow-1.10.3
pypiPackages:
apache-beam: ==2.15.0
google-api-core: ==1.14.3
google-apitools: ==0.5.28
google-cloud-core: ==1.0.3
pythonVersion: '3'
Run Code Online (Sandbox Code Playgroud)
让我知道这是否适合您。如果是这样,我建议将代码移动到插件中以提高代码可读性,并在 DAG 之间重用它。
归档时间: |
|
查看次数: |
159 次 |
最近记录: |