相关疑难解决方法(0)

在Airflow中创建动态工作流的正确方法

问题

在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.

dag会触发工作吗?如果是这样,请你举个例子.

我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

想法#1

我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)

编辑1: …

python workflow airflow

66
推荐指数
8
解决办法
2万
查看次数

如何动态嵌套Airflow DAG?

我有一个由三个运算符组成的简单DAG。第一个是PythonOperator我们自己的功能,另外两个是标准的运营商从airflow.contribFileToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator要准确)。它们按顺序工作。根据参数,我们的自定义任务会生成许多文件,通常在2到5之间。所有这些文件都必须由后续任务分别处理。这意味着我想要几个下游分支,但在运行DAG之前确切知道有多少个分支。

您将如何解决这个问题?

更新:

使用BranchPythonOperator他在另一个答复中提到的jhnclvr 作为出发点,我创建了一个根据条件跳过或继续执行分支的运算符。该方法仅是可行的,因为已知最大可能的分支数并且足够小。

运营商:

class SkipOperator(PythonOperator):
    def execute(self, context):
        boolean = super(SkipOperator, self).execute(context)
        session = settings.Session()
        for task in context['task'].downstream_list:
            if boolean is False:
                ti = TaskInstance(
                    task, execution_date=context['ti'].execution_date)
                ti.state = State.SKIPPED
                ti.start_date = datetime.now()
                ti.end_date = datetime.now()
                session.merge(ti)
        session.commit()
        session.close()
Run Code Online (Sandbox Code Playgroud)

用法:

def check(i, templates_dict=None, **kwargs):
    return len(templates_dict["data_list"].split(",")) > i

dag = DAG(
    dag_name,
    default_args=default_args,
    schedule_interval=None
)

load = CustomOperator(
    task_id="load_op",
    bash_command=' '.join([
        './command.sh'
        '--data-list {{ …
Run Code Online (Sandbox Code Playgroud)

airflow

5
推荐指数
1
解决办法
1665
查看次数

标签 统计

airflow ×2

python ×1

workflow ×1