没有名为airfow.gcp的模块-如何运行使用python3 / beam 2.15的数据流作业?

WIT*_*WIT 7 python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator

当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?

我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?

Gui*_*ins 6

Composer 中可用的最新 Airflow 版本是 1.10.2 或 1.10.3(取决于地区)。到那时,这些操作员都在该contrib部分。

关注如何使用 Composer 运行 Python 3 Dataflow 作业,您需要发布新版本。但是,如果您需要即时解决方案,您可以尝试向后移植此修复程序

在这种情况下,我定义了一个DataFlow3Hook扩展法线DataFlowHook但它没有python2start_python_dataflow方法中硬编码:

class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        ...
        py_interpreter: str = "python3"
    ):

        ...

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)
Run Code Online (Sandbox Code Playgroud)

然后我们将自定义DataFlowPython3Operator调用新钩子:

class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        ...
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        ...
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")
Run Code Online (Sandbox Code Playgroud)

最后,在我们的 DAG 中,我们只使用 new 运算符:

task = DataFlowPython3Operator(
    py_file='/home/airflow/gcs/data/main.py',
    task_id=JOB_NAME,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

在此处查看完整代码。作业使用 Python 3.6 运行:

在此处输入图片说明

使用的环境详细信息和依赖项(Beam 作业是一个最小示例):

softwareConfig:
  imageVersion: composer-1.8.0-airflow-1.10.3
  pypiPackages:
    apache-beam: ==2.15.0
    google-api-core: ==1.14.3
    google-apitools: ==0.5.28
    google-cloud-core: ==1.0.3
  pythonVersion: '3'
Run Code Online (Sandbox Code Playgroud)

让我知道这是否适合您。如果是这样,我建议将代码移动到插件中以提高代码可读性,并在 DAG 之间重用它。