在气流 DAG 中出现错误,不支持 >> 的操作数类型:“列表”和“列表”。任务的顺序和并行执行

Joe*_*988 1 python directed-acyclic-graphs airflow airflow-scheduler

我是 Apache 气流和 DAG 的新手。DAG 中共有 6 个任务(task1、task2、task3、task4、task5、task6)。但是在运行 DAG 时,我们收到以下错误。

DAG 不支持 >> 的操作数类型:“列表”和“列表”

下面是我的 DAG 代码。请帮忙。我是气流的新手。

from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False
}

dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 14)) 


#################### CREATE TASK #####################################   

task_1 = DatabricksSubmitRunOperator(
    task_id='task_1',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_1/task_1.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_1.driver.TestClass1',
        'parameters' : [
            '{{ dag_run.conf.json }}'       
        ]
    }
)



    
task_2 = DatabricksSubmitRunOperator(
    task_id='task_2',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',   
    libraries= [
        {
        'jar': 'dbfs:/task_2/task_2.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_2.driver.TestClass2',
        'parameters' : [
            '{{ dag_run.conf.json }}'                               
        ]
    }
)
    
task_3 = DatabricksSubmitRunOperator(
    task_id='task_3',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',   
    libraries= [
        {
        'jar': 'dbfs:/task_3/task_3.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_3.driver.TestClass3',
        'parameters' : [
            '{{ dag_run.conf.json }}'   
        ]
    }
) 

task_4 = DatabricksSubmitRunOperator(
    task_id='task_4',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_4/task_4.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_4.driver.TestClass4',
        'parameters' : [
            '{{ dag_run.conf.json }}'   
        ]
    }
) 

task_5 = DatabricksSubmitRunOperator(
    task_id='task_5',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_5/task_5.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_5.driver.TestClass5',
        'parameters' : [
            'json ={{ dag_run.conf.json }}' 
        ]
    }
) 

task_6 = DatabricksSubmitRunOperator(
    task_id='task_6',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_6/task_6.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_6.driver.TestClass6',
        'parameters' : ['{{ dag_run.conf.json }}'   
        ]
    }
) 

#################### ORDER OF OPERATORS ###########################  
 
    task_1.dag = dag
    task_2.dag = dag
    task_3.dag = dag
    task_4.dag = dag
    task_5.dag = dag
    task_6.dag = dag

task_1 >> [task_2 , task_3] >> [ task_4 , task_5 ] >> task_6 
Run Code Online (Sandbox Code Playgroud)

小智 9

Airflow 任务依赖项无法处理 [list]>>[list]。解决此问题的最简单方法是在多行上指定依赖项:

task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5]
task_3 >> [task_4, task_5]
[task_4 , task_5 ] >> task_6
Run Code Online (Sandbox Code Playgroud)


kax*_*xil 6

你想要的任务依赖是什么?你想运行task_4task_2只或之后task_2task_3

根据该答案,使用以下方法之一:

(如果 task_4 应该在两者之后运行task_2task_3完成,则使用此选项)

task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5] >> task_6
task_3 >> [task_4, task_5]
Run Code Online (Sandbox Code Playgroud)

或者

(如果 task_4 应该在task_2完成后task_5运行并且应该在task_3完成后运行,则使用此选项)

task_1 >> [task_2 , task_3]
task_2 >> task_4
task_3 >> task_5
[task_4, task_5] >> task_6
Run Code Online (Sandbox Code Playgroud)

提示,您无需执行以下操作:

    task_1.dag = dag
    task_2.dag = dag
    task_3.dag = dag
    task_4.dag = dag
    task_5.dag = dag
    task_6.dag = dag
Run Code Online (Sandbox Code Playgroud)

您可以将dag参数传递给您的任务本身,例如:

task_6 = DatabricksSubmitRunOperator(
    task_id='task_6',
    dag=dag,
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_6/task_6.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_6.driver.TestClass6',
        'parameters' : ['{{ dag_run.conf.json }}'   
        ]
    }
) 
Run Code Online (Sandbox Code Playgroud)

或使用DAG为您的上下文管理作为记录https://airflow.apache.org/docs/stable/concepts.html#context-manager和点(1)https://medium.com/datareply/airflow-lesser -已知的提示技巧和最佳实践-cf4d4a90f8f