在 Apache Airflow 2.0 TaskFlow DAG 中使用 PythonVirtualenvOperator

4 airflow

我试图弄清楚如何使用PythonVirtualenvOperator我正在创建的 DAG 的内部,使用 Apache Airflow 2.0.1 中的 TaskFlow API。

示例代码

我的示例 DAG 如下所示:

from time import sleep
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator

default_args = {
  'owner': 'airflow'
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags = ['trevor'])
def trevor_dag():
  @task()
  def task1():
    sleep(2)
    return 'hi'
  t_task1 = PythonVirtualenvOperator(python_callable=task1, system_site_packages=False, requirements=['boto3', 'apache-airflow'], task_id='trevor')
  

mydag = trevor_dag()
Run Code Online (Sandbox Code Playgroud)

预期结果

DAG 在 Python 虚拟环境中成功执行。

实际结果

看来,即使我将apache-airflow包安装到虚拟环境中,它也没有找到 TaskFlow API 类型。

[2021-04-06 22:44:19,324] {process_utils.py:135} INFO - Executing cmd: /tmp/venvgjg2bsg6/bin/python /tmp/venvgjg2bsg6/script.py /tmp/venvgjg2bsg6/script.in /tmp/venvgjg2bsg6/script.out /tmp/venvgjg2bsg6/string_args.txt
[2021-04-06 22:44:19,329] {process_utils.py:137} INFO - Output:
[2021-04-06 22:44:19,722] {process_utils.py:141} INFO - Traceback (most recent call last):
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO -   File "/tmp/venvgjg2bsg6/script.py", line 28, in <module>
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO -     @task()
[2021-04-06 22:44:19,724] {process_utils.py:141} INFO - NameError: name 'task' is not defined
[2021-04-06 22:44:20,059] {taskinstance.py:1455} ERROR - Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
    super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 531, in execute_callable
    string_args_filename,
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/process_utils.py", line 145, in execute_in_subprocess
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
[2021-04-06 22:44:20,061] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=trevor_dag, task_id=trevor, execution_date=20210406T224317, start_date=20210406T224317, end_date=20210406T224420
Run Code Online (Sandbox Code Playgroud)

问题:如何正确利用PythonVirtualenvOperator基于 Airflow 2.x TaskFlow API 构建的 DAG?

Ser*_*kov 7

PythonVirtualenvOperator期望函数作为其python_callable形参的实参来执行。由于您task在 上使用了装饰器task1()PythonVirtualenvOperator因此得到的是 Airflow 运算符(而不是函数task1())。您需要删除该task装饰器。

此外,task1()将从 DAG 中“切出”并在虚拟环境中单独执行。因此,您必须在函数内执行所有必要的导入。即导入sleep().

以下 DAG 将按预期工作:

from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator

default_args = {"owner": "airflow"}


@dag(
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["trevor"],
)
def trevor_dag():
    def task1():
        from time import sleep

        sleep(100)
        return "hi"

    t_task1 = PythonVirtualenvOperator(
        python_callable=task1,
        system_site_packages=False,
        requirements=["boto3", "apache-airflow"],
        task_id="trevor",
    )


mydag = trevor_dag()
Run Code Online (Sandbox Code Playgroud)