4 airflow
我试图弄清楚如何使用PythonVirtualenvOperator我正在创建的 DAG 的内部,使用 Apache Airflow 2.0.1 中的 TaskFlow API。
我的示例 DAG 如下所示:
from time import sleep
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator
default_args = {
'owner': 'airflow'
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags = ['trevor'])
def trevor_dag():
@task()
def task1():
sleep(2)
return 'hi'
t_task1 = PythonVirtualenvOperator(python_callable=task1, system_site_packages=False, requirements=['boto3', 'apache-airflow'], task_id='trevor')
mydag = trevor_dag()
Run Code Online (Sandbox Code Playgroud)
DAG 在 Python 虚拟环境中成功执行。
看来,即使我将apache-airflow包安装到虚拟环境中,它也没有找到 TaskFlow API 类型。
[2021-04-06 22:44:19,324] {process_utils.py:135} INFO - Executing cmd: /tmp/venvgjg2bsg6/bin/python /tmp/venvgjg2bsg6/script.py /tmp/venvgjg2bsg6/script.in /tmp/venvgjg2bsg6/script.out /tmp/venvgjg2bsg6/string_args.txt
[2021-04-06 22:44:19,329] {process_utils.py:137} INFO - Output:
[2021-04-06 22:44:19,722] {process_utils.py:141} INFO - Traceback (most recent call last):
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO - File "/tmp/venvgjg2bsg6/script.py", line 28, in <module>
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO - @task()
[2021-04-06 22:44:19,724] {process_utils.py:141} INFO - NameError: name 'task' is not defined
[2021-04-06 22:44:20,059] {taskinstance.py:1455} ERROR - Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 531, in execute_callable
string_args_filename,
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/process_utils.py", line 145, in execute_in_subprocess
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
[2021-04-06 22:44:20,061] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=trevor_dag, task_id=trevor, execution_date=20210406T224317, start_date=20210406T224317, end_date=20210406T224420
Run Code Online (Sandbox Code Playgroud)
问题:如何正确利用PythonVirtualenvOperator基于 Airflow 2.x TaskFlow API 构建的 DAG?
PythonVirtualenvOperator期望函数作为其python_callable形参的实参来执行。由于您task在 上使用了装饰器task1(),PythonVirtualenvOperator因此得到的是 Airflow 运算符(而不是函数task1())。您需要删除该task装饰器。
此外,task1()将从 DAG 中“切出”并在虚拟环境中单独执行。因此,您必须在函数内执行所有必要的导入。即导入sleep().
以下 DAG 将按预期工作:
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator
default_args = {"owner": "airflow"}
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["trevor"],
)
def trevor_dag():
def task1():
from time import sleep
sleep(100)
return "hi"
t_task1 = PythonVirtualenvOperator(
python_callable=task1,
system_site_packages=False,
requirements=["boto3", "apache-airflow"],
task_id="trevor",
)
mydag = trevor_dag()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4339 次 |
| 最近记录: |