生成 uuid 并在 Airflow DAG 中使用它

Jus*_*ase 5 uuid constants airflow

我正在尝试创建一个具有以下 2 个任务的动态气流:任务 1:使用生成的 UUID 作为其名称的一部分创建文件任务 2:对这些文件运行检查

所以我定义了一个变量“FILE_UUID”并将其设置如下:str(uuid.uuid4())。并且还创建了一个常量文件名:MY_FILE = '{file_uuid}_file.csv'.format(file_uuid=FILE_UUID}

Then - Task 1 is a bashOperator that get MY_FILE as part of the command, and it creates a file successfully. I can see the generated files include a specific UUID in the name,

TASK 2 fails is a PythonOperator that get MY_FILE as an op_args. But can't access the file. Logs show that it tries to access files with a different UUID.

Why is my "constant" is being run separately on every task? Is there any way to prevent that from happening?

I'm using Airflow 1.10, my executor is LocalExecutor.

I tried setting the constant outside the "with DAG" and inside it, also tries working with macros, but then PythonOperator just uses the macro strings literally using the values they hold.

bos*_*jak 9

您必须记住,DAG 定义文件是一种“配置脚本”,而不是运行 DAG 的实际可执行文件。这些任务在完全不同的环境中执行,大多数情况下甚至不在同一台机器上。把它想象成一个配置 XML,它设置你的任务,然后它们在云中的其他机器上构建和执行 - 但它是 Python 而不是 XML。

总之 - 您的 DAG 代码是 Python,但它不是在您的任务运行时执行的代码。因此,如果您在那里生成随机 uuid,它将在未知时间和多次评估 - 对于每个任务,在不同的机器上。

要使其跨任务保持一致,您需要找到另一种方式,例如:

  • 使用 XCOM,以便第一个任务使用它获得的 uuid,然后将其写入 XCOM 以供所有下游任务使用。
  • 将您的 uuid 与管道中的某个常量、来源、日期或其他任何内容(例如,如果它是一项日常任务,您可以从日期部分混合一些 dag/任务细节等来构建您的 uuid) - 无论什么都会使您的 uuid所有任务都相同,但对于独特的日子是独一无二的)

使用第一种方法(XCOM 的)的 DAG 示例:

from datetime import datetime
import uuid

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

with DAG(dag_id='global_uuid',
         schedule_interval='@daily',
         start_date=...) as dag:

    generate_uuid = PythonOperator(
        task_id='generate_uuid',
        python_callable=lambda: str(uuid.uuid4())
    )

    print_uuid1 = BashOperator(
        task_id='print1',
        bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
    )

    print_uuid2 = BashOperator(
        task_id='print2',
        bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
    )

    generate_uuid >> print_uuid1 >> print_uuid2

Run Code Online (Sandbox Code Playgroud)