循环浏览Airflow中参数的最佳方法?

and*_*894 5 apache google-bigquery airflow

到目前为止,我正在尝试熟悉Airflow并热爱它。

但是,我不清楚的一件事是如何在我想运行相同dag但并行处理多个业务(lob)的dag中正确地设置dag参数。所以基本上我想在每次运行中为多个吊球运行以下dag,并让每个吊球并行运行。

因此,可以说我定义了一个变量,该变量是诸如“ lob1”,“ lob2”等lob的数组。我想在下面的bigquery sql语句中将“ mylob”替换为“ lob1”,然后替换为“ lob2”等。

我在想也许我可以将lob存储为ui中的一个变量,然后在dag中循环遍历,但是我不确定是否最终会是连续的,因为它等待每个任务在每次循环迭代中完成。

我认为另一种方法可能是使用这种参数化的dag作为较大驱动程序dag中的sub dag。但是再次不确定这是否是最佳实践方法。

任何帮助或指针,不胜感激。我觉得我在这里缺少明显的东西,但是在任何地方都找不到这样的例子。

"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from dateutil import tz
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 10),    
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

with DAG('my_bq_dag_2', schedule_interval='30 */1 * * *',
         default_args=default_args) as dag:

    bq_msg_1 = BigQueryOperator(
        task_id='my_bq_task_1',
        bql='select "mylob" as lob, "Hello World!" as msg',
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_TRUNCATE',
        bigquery_conn_id='gcp_smoke'
    )

    bq_msg_1.doc_md = """\
    #### Task Documentation
    Append a "Hello World!" message string to the table [airflow.msg]
    """

    bq_msg_2 = BigQueryOperator(
        task_id='my_bq_task_2',
        bql='select "mylob" as lob, "Goodbye World!" as msg',
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='gcp_smoke'
    )

    bq_msg_2.doc_md = """\
    #### Task Documentation
    Append a "Goodbye World!" message string to the table [airflow.msg]
    """

    # set dependencies
    bq_msg_2.set_upstream(bq_msg_1)
Run Code Online (Sandbox Code Playgroud)

更新:试图使它正常工作,但似乎从未使它达到lob2

"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 10),    
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('my_bq_dag_2', schedule_interval='@once',default_args=default_args)

lobs = ["lob1","lob2","lob3"]

for lob in lobs:

    templated_command = """
        select '{{ params.lob }}' as lob, concat(string(current_timestamp()),' - Hello - {{ ds }}') as msg
    """    

    bq_msg_1 = BigQueryOperator(
        dag = dag,
        task_id='my_bq_task_1',
        bql=templated_command,
        params={'lob': lob},
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='gcp_smoke'
    )
Run Code Online (Sandbox Code Playgroud)

and*_*894 5

我想我已经找到了一个似乎对我有用的答案/方法(上面的问题是没有唯一的任务ID)。

写了一篇关于该示例的小博客文章,以防对其他人有用。

http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/