相关疑难解决方法(0)

在Airflow中创建动态工作流的正确方法

问题

在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.

dag会触发工作吗?如果是这样,请你举个例子.

我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

想法#1

我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)

编辑1: …

python workflow airflow

66
推荐指数
8
解决办法
2万
查看次数

Airflow 任务能否在运行时动态生成 DAG?

我有一个不规则上传的上传文件夹。对于每个上传的文件,我想生成一个特定于该文件的 DAG。

我的第一个想法是使用 FileSensor 来执行此操作,该文件传感器监视上传文件夹,并以新文件的存在为条件,触发创建单独 DAG 的任务。从概念上讲:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)

在我最初的实现中,CreateDAGTaskPythonOperator通过将它们放置在全局命名空间中来创建 DAG 全局变量(请参阅此 SO 答案),如下所示:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)

主 DAG 然后通过一个调用这个逻辑PythonOperator

# File-sensing DAG
default_args = {
    "depends_on_past" : False,
    "start_date"      : datetime(2020, 7, 16),
    "retries"         : 1,
    "retry_delay"     : timedelta(hours=5),
} …
Run Code Online (Sandbox Code Playgroud)

airflow airflow-scheduler

7
推荐指数
1
解决办法
6257
查看次数

Airflow:如何从 PythonOperator 中的 python_callable 内部创建子运算符

我有一个简单的 python 运算符,定义如下:

loop_records = PythonOperator(
    task_id = 'loop_records',
    provide_context = True,
    python_callable = loop_topic_records,
    dag = dag    
)
Run Code Online (Sandbox Code Playgroud)

这个 python 操作符调用loop_topic_records,定义如下:

def loop_topic_records(**context):
    parent_dag = context['dag']
    for i in range(3):
        op = DummyOperator(
            task_id="child_" + str(i),
            dag=parent_dag
        )
        logging.info('Child operator ' + str(i))
        loop_records >> op
Run Code Online (Sandbox Code Playgroud)

我看到代码没有引发任何错误。它甚至打印Child operator 0..2在日志中。但是,在 dag 中Graph view我没有看到子操作符,我只看到loop_records节点,就好像我的 dag 只包含一个操作符一样。那么,这有什么问题呢?我该如何解决?

python airflow

3
推荐指数
1
解决办法
1361
查看次数

标签 统计

airflow ×3

python ×2

airflow-scheduler ×1

workflow ×1