cos*_*ouc 66 python workflow airflow
在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.
dag会触发工作吗?如果是这样,请你举个例子.
我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTaskSensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)
截至目前,这个问题仍然没有很好的答案.有几个人正在寻找解决方案.
Ole*_*min 21
以下是我在没有任何子标签的情况下使用类似请求的方法:
首先创建一个返回所需值的方法
def values_function():
return values
Run Code Online (Sandbox Code Playgroud)
接下来创建将动态生成作业的方法:
def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
return BashOperator(
task_id='JOB_NAME_{}'.format(number),
bash_command='script.sh {} {}'.format(dyn_value, number),
dag=dag)
Run Code Online (Sandbox Code Playgroud)
然后结合它们:
push_func = PythonOperator(
task_id='push_func',
provide_context=True,
python_callable=values_function,
dag=dag)
complete = DummyOperator(
task_id='All_jobs_completed',
dag=dag)
for i in values_function():
push_func >> group(i) >> complete
Run Code Online (Sandbox Code Playgroud)
Gab*_*abe 18
仅适用于 v2.3 及以上版本:
此功能是使用动态任务映射实现的,仅适用于 Airflow 2.3 及更高版本
更多文档和示例在这里:
例子:
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(list(arg))
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
Run Code Online (Sandbox Code Playgroud)
示例2:
from airflow import XComArg
task = MyOperator(task_id="source")
downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))
Run Code Online (Sandbox Code Playgroud)
图形视图和树视图也已更新:
相关问题在这里:
Chr*_*eck 10
我已经找到了一种根据先前任务的结果创建工作流的方法。
基本上,您想要做的是具有两个以下子子项:
def return_list())parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1])过滤掉当前运行的任务,您可能会在此处添加更多过滤器。dag_id='%s.%s' % (parent_dag_name, 'test1')现在,我已经在本地气流安装中对此进行了测试,并且工作正常。我不知道如果同时运行多个dag实例,xcom pull部分是否会有问题,但是您可能会使用唯一键或类似的东西来唯一地标识xcom想要的价值。可能可以将3.步骤优化为100%确保获得当前主dag的特定任务,但是对于我的使用来说,这执行得很好,我认为一个人只需要一个task_instance对象即可使用xcom_pull。
另外,在每次执行之前,我都会先清理xcoms的第一个subdag,以确保不会意外得到任何错误的值。
我很难解释,所以我希望下面的代码能使所有内容变得清晰:
test1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return ['test1', 'test2']
list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
Run Code Online (Sandbox Code Playgroud)
test2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
Run Code Online (Sandbox Code Playgroud)
和主要工作流程:
test.py
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = 'test-dag'
dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)
test1 >> test2
Run Code Online (Sandbox Code Playgroud)
我认为您正在寻找的是动态创建 DAG 我在几天前经过一番搜索后遇到了这种情况,我发现了这个博客。
动态任务生成
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag)
def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
Run Code Online (Sandbox Code Playgroud)
设置 DAG 工作流程
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# Use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
# Extract table names and fields to be processed
tables = configFile['tables']
# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})
# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})
# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end
Run Code Online (Sandbox Code Playgroud)
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(
task_id='start',
dag=dag
)
def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id=task_id,
provide_context=True,
# Eval is used since the callableFunction var is of type string
# while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable=eval(callableFunction),
op_kwargs=args,
xcom_push=True,
dag=dag,
)
return task
end = DummyOperator(
task_id='end',
dag=dag)
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
# Extract table names and fields to be processed
tables = configFile['tables']
# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})
# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})
# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end
Run Code Online (Sandbox Code Playgroud)
这是非常有帮助的,希望它也能帮助其他人
太多了?反正。
很多其他答案都有点方钉圆孔。添加复杂的新运算符,滥用内置变量,或者在某种程度上未能回答问题。我对它们中的任何一个都不是特别满意,因为它们要么在通过 Web UI 查看时隐藏自己的行为,要么容易被破坏,要么需要大量自定义代码(这也很容易被破坏)。
该解决方案使用内置功能,不需要新的运算符和有限的附加代码,DAG 通过 UI 可见,无需任何技巧,并遵循气流最佳实践(请参阅幂等性)。
这个问题的解决方案相当复杂,所以我把它分成几个部分。这些都是:
是的。有点。无需编写任何新的运算符,仅使用内置运算符即可让 DAG 触发动态数量的其他 DAG。然后可以将其扩展为使 DAG 依赖于动态数量的其他 DAG(请参阅等待任务完成)。这与flinz 的解决方案类似,但更强大并且自定义代码更少。
这是使用 BranchPythonOperator 来完成的,该 BranchPythonOperator 有选择地触发 2 个其他 TriggerDagRunOperator。其中一个递归地重新调用当前 DAG,另一个调用外部 dag,即目标函数。
recursive_dag.py 的顶部给出了可用于触发 dag 的示例配置。
print_conf.py(要触发的 DAG 示例)
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_output(dag_run):
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)
Run Code Online (Sandbox Code Playgroud)
recursive_dag.py(魔法发生的地方)
"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'
def choose_branch(task_instance, dag_run):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
# dump to string because value is stringified into
# template string, is then parsed.
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
return next_task_id
def add_braces(in_string):
return '{{' + in_string + '}}'
def make_templated_pull(key):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
return add_braces(pull)
with DAG(
dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)
trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id),
conf=make_templated_pull(next_task_conf)
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=dag_id,
conf=make_templated_pull(repeat_task_conf)
)
branch >> [trigger_next, trigger_repeat]
Run Code Online (Sandbox Code Playgroud)
该解决方案的优点是使用非常有限的自定义代码。flinz 的解决方案可能会中途失败,导致某些计划任务完成,而另一些则不完成。然后在重试时,DAGS 可能会被安排运行两次,或者在第一个 dag 上失败,导致失败的任务完成部分工作。此方法将告诉您哪些 DAG 触发失败,并仅重试触发失败的 DAG。因此,这种方法是幂等的,而另一种则不是。
是的,但是...如果任务不并行运行,这可以轻松完成。并行运行比较复杂。
要按顺序运行,重要的更改是使用wait_for_completion=Truein trigger_next,使用 python 运算符在“trigger_next”之前设置 xcom 值,并添加一个启用或禁用重复任务的分支运算符,然后具有线性依赖关系
setup_xcom >> trigger_next >> branch >> trigger_repeat
Run Code Online (Sandbox Code Playgroud)
要并行运行,您可以类似地递归链接多个使用模板external_dag_id值以及与触发的 dag 运行关联的时间戳的ExternalTaskSensor。要获取触发的 dag 时间戳,您可以使用触发 dag 的时间戳来触发 dag。然后这些传感器一一等待所有创建的 DAG 完成,然后触发最终的 DAG。下面的代码,这次我向打印输出 DAG 添加了随机睡眠,以便等待 dags 实际上进行一些等待。
注意:recurse_wait_dag.py 现在定义了 2 个 dags,这两个 dags 都需要启用才能正常工作。
recurse_wait_dag.py 顶部给出了可用于触发 dag 的示例配置
print_conf.py(修改为添加随机睡眠)
"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""
from datetime import timedelta
from time import sleep
from random import randint
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_output(dag_run):
sleep_time = randint(15,30)
print(f'sleeping for time: {sleep_time}')
sleep(sleep_time)
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)
Run Code Online (Sandbox Code Playgroud)
recurse_wait_dag.py(更多魔法发生的地方)
"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
"final_task": "print_output",
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone
from common import make_templated_pull
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
def to_conf(id):
return f'{id}_conf'
def to_execution_date(id):
return f'{id}_execution_date'
def to_ts(id):
return f'{id}_ts'
recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)
wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)
def choose_branch(task_instance, dag_run, ts):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
# can't have multiple dag runs of same DAG with same timestamp
assert next_task != recurse_dag_id
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]
task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
task_instance.xcom_push(key=next_task_execution_date, value=ts)
if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs,
'triggered_tasks': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
end_conf = json.dumps({
'tasks_to_wait': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=end_task_conf, value=end_conf)
return [next_task_id, end_task_id]
def choose_wait_target(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
next_task, next_ts = tasks_to_wait[0]
later_tasks = tasks_to_wait[1:]
task_instance.xcom_push(key=next_wait_id, value=next_task)
task_instance.xcom_push(key=next_wait_ts, value=next_ts)
if later_tasks:
repeat_conf = json.dumps({
'tasks_to_wait': later_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
def execution_date_fn(_, task_instance):
date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
return timezone.parse(date_str)
def choose_wait_branch(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
if len(tasks_to_wait) == 1:
return end_task_id
return repeat_task_id
with DAG(
recurse_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as recursive_dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)
trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id, branch_id),
execution_date=make_templated_pull(next_task_execution_date, branch_id),
conf=make_templated_pull(next_task_conf, branch_id)
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(repeat_task_conf, branch_id)
)
trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(end_task_conf, branch_id)
)
branch >> [trigger_next, trigger_repeat, trigger_end]
with DAG(
wait_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as wait_dag:
py_operator = PythonOperator(
task_id=choose_wait_id,
python_callable=choose_wait_target
)
sensor = ExternalTaskSensor(
task_id='do_wait',
external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
execution_date_fn=execution_date_fn
)
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_wait_branch
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(repeat_task_conf, choose_wait_id)
)
trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
)
py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]
Run Code Online (Sandbox Code Playgroud)
太好了,但您想实际使用它。那么,你需要做什么?该问题包括一个尝试执行以下操作的示例:
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)
为了实现问题目标(下面的示例实现),您需要将任务 A、B 和 C 分成各自的 DAG。然后,在 DAG A 末尾添加一个新运算符,触发上述 DAG 'recurse_then_wait'。向此 dag 传递一个配置,其中包括每个 B DAG 所需的配置以及 B dag id(可以轻松更改以使用不同的 dag,加油)。然后包含 DAG C 的名称,即最终要运行的 DAG。此配置应如下所示:
{
"final_task": "C_DAG",
"task_list": ["B_DAG","B_DAG"],
"conf_list": [
{
"b_number": 1,
"more_stuff": "goes_here"
},
{
"b_number": 2,
"foo": "bar"
}
]
}
Run Code Online (Sandbox Code Playgroud)
实施后,它应该看起来像这样:
触发递归.py
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from recurse_wait_dag import recurse_dag_id
def add_braces(in_string):
return '{{' + in_string + '}}'
def make_templated_pull(key, task_id):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
return add_braces(pull)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'
def setup_trigger_conf(task_instance):
trigger_conf = {
'final_task': 'print_output',
'task_list': ['print_output','print_output'],
'conf_list': [
{
'output': 'Hello'
},
{
'output': 'world!'
}
]
}
print('Triggering the following tasks')
for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
print(f' task: {task} with config {json.dumps(conf)}')
print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')
task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))
with DAG(
'trigger_recurse_example',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
py_operator = PythonOperator(
task_id=setup_trigger_conf_id,
python_callable=setup_trigger_conf
)
trigger_operator = TriggerDagRunOperator(
task_id='trigger_call_and_wait',
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
)
py_operator >> trigger_operator
Run Code Online (Sandbox Code Playgroud)
所有这些最终看起来如下所示,其中垂直和水平线显示一个 DAG 触发另一个 DAG 的位置:
A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C
Run Code Online (Sandbox Code Playgroud)
任务不再在单个图表上可见。这可能是这种方法的最大问题。通过向所有关联的 DAG 添加标签,至少可以一起查看 DAG。然而,将 DAG B 的多个并行运行与 DAG A 的运行联系起来是很混乱的。但是,由于单个 DAG 运行显示其输入配置,这意味着每个 DAG B 运行不依赖于 DAG A,仅依赖于其输入配置。因此,这种关系至少可以部分忽略。
任务无法再使用 xcom 进行通信。B 任务可以通过 DAG 配置接收来自任务 A 的输入,但是任务 C 无法从 B 任务获取输出。所有 B 任务的结果应放入已知位置,然后由任务 C 读取。
'recurse_and_wait' 的配置参数也许可以改进以组合 task_list 和 conf_list,但这解决了所述问题。
最终 DAG 没有配置。解决这个问题应该是微不足道的。
OA:“Airflow 中是否有任何方法可以创建工作流,使得任务 B.* 的数量在任务 A 完成之前是未知的?”
简短的回答是否定的。Airflow 将在开始运行之前构建 DAG 流。
也就是说,我们得出了一个简单的结论,那就是我们没有这样的需求。当您想要并行处理某些工作时,您应该评估可用的资源,而不是要处理的项目数量。
我们是这样做的:我们动态生成固定数量的任务,比如 10 个,这将拆分工作。例如,如果我们需要处理 100 个文件,每个任务将处理其中的 10 个。我将在今天晚些时候发布代码。
更新
这是代码,抱歉延迟。
from datetime import datetime, timedelta
import airflow
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
'email': ['myemail@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
dag = airflow.DAG(
'parallel_tasks_v1',
schedule_interval="@daily",
catchup=False,
default_args=args)
# You can read this from variables
parallel_tasks_total_number = 10
start_task = DummyOperator(
task_id='start_task',
dag=dag
)
# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
return DummyOperator(
provide_context=True,
task_id='parallel_task_' + str(current_task_number),
python_callable=parallelTask,
# your task will take as input the total number and the current number to elaborate a chunk of total elements
op_args=[current_task_number, int(parallel_tasks_total_number)],
dag=dag)
end = DummyOperator(
task_id='end',
dag=dag)
for page in range(int(parallel_tasks_total_number)):
created_task = create_dynamic_task(page)
start_task >> created_task
created_task >> end
Run Code Online (Sandbox Code Playgroud)
代码说明:
这里我们有一个开始任务和一个结束任务(都是虚拟的)。
然后从带有 for 循环的 start 任务开始,我们创建了 10 个具有相同 python 可调用项的任务。任务在函数 create_dynamic_task 中创建。
我们将并行任务的总数和当前任务索引作为参数传递给每个 python 可调用对象。
假设您有 1000 个项目要详细说明:第一个任务将收到输入,它应该详细说明 10 个块中的第一个块。它将 1000 个项目分成 10 个块并详细说明第一个。
作业图不是在运行时生成的。相反,当 Airflow 从 dags 文件夹中拾取图表时,就会构建该图表。因此,每次运行作业时实际上都不可能有不同的图表。您可以配置作业以在加载时基于查询构建图表。此后的每次运行该图都将保持不变,这可能不是很有用。
您可以设计一个图表,使用分支运算符根据查询结果在每次运行时执行不同的任务。
我所做的是预先配置一组任务,然后获取查询结果并将它们分布在任务中。无论如何,这可能更好,因为如果您的查询返回大量结果,您可能不想让调度程序充满大量并发任务。为了更安全,我还使用了一个池来确保并发性不会因意外的大查询而失控。
"""
- This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask
########################################################################
default_args = {
'owner': 'airflow',
'catchup': False,
'depends_on_past': False,
'start_date': datetime(2019, 7, 2, 19, 50, 00),
'email': ['rotten@stackoverflow'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'max_active_runs': 1
}
dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)
totalBuckets = 5
get_orders_query = """
select
o.id,
o.customer
from
orders o
where
o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
and
o.is_test = false
and
o.is_processed = false
"""
###########################################################################################################
# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
return PythonOperator(
task_id=f'order_processing_task_{bucket_number}',
python_callable=runOrderProcessing,
pool='order_processing_pool',
op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
provide_context=True,
dag=dag
)
# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)
if orderList is not None:
for order in orderList:
logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
doStuff(**op_kwargs)
# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')
# initialize the task list buckets
tasks = {}
for task_number in range(0, totalBuckets):
tasks[f'order_processing_task_{task_number}'] = []
# populate the task list buckets
# distribute them evenly across the set of buckets
resultCounter = 0
for record in myDatabaseHook.get_records(get_orders_query):
resultCounter += 1
bucket = (resultCounter % totalBuckets)
tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})
# push the order lists into xcom
for task in tasks:
if len(tasks[task]) > 0:
logging.info(f'Task {task} has {len(tasks[task])} orders.')
context['ti'].xcom_push(key=task, value=tasks[task])
else:
# if we didn't have enough tasks for every bucket
# don't bother running that task - remove it from the list
logging.info(f"Task {task} doesn't have any orders.")
del(tasks[task])
return list(tasks.keys())
###################################################################################################
# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
task_id='clean_xcoms',
mysql_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
task_id='get_orders',
python_callable=getOpenOrders,
provide_context=True,
dag=dag
)
get_orders_task.set_upstream(clean_xcoms)
# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
taskBucket = createOrderProcessingTask(bucketNumber)
taskBucket.set_upstream(get_orders_task)
###################################################################################################
Run Code Online (Sandbox Code Playgroud)
是的,这是可能的,我创建了一个示例DAG来演示这一点。
import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
main_dag_id = 'DynamicWorkflow2'
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG(
main_dag_id,
schedule_interval="@once",
default_args=args)
def start(*args, **kwargs):
value = Variable.get("DynamicWorkflow_Group1")
logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
def resetTasksStatus(task_id, execution_date):
logging.info("Resetting: " + task_id + " " + execution_date)
dag_folder = conf.get('core', 'DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[main_dag_id]
session = settings.Session()
my_task = check_dag.get_task(task_id)
ti = TaskInstance(my_task, execution_date)
state = ti.current_state()
logging.info("Current state of " + task_id + " is " + str(state))
ti.set_state(None, session)
state = ti.current_state()
logging.info("Updated state of " + task_id + " is " + str(state))
def bridge1(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 2
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))
def bridge2(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 3
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))
def end(*args, **kwargs):
logging.info("Ending")
def doSomeWork(name, index, *args, **kwargs):
# Do whatever work you need to do
# Here I will just create a new file
os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')
starting_task = PythonOperator(
task_id='start',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])
# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
task_id='bridge1',
dag=dag,
provide_context=True,
python_callable=bridge1,
op_args=[])
DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
for index in range(int(DynamicWorkflow_Group1)):
dynamicTask = PythonOperator(
task_id='firstGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['firstGroup', index])
starting_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge1_task)
# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
task_id='bridge2',
dag=dag,
provide_context=True,
python_callable=bridge2,
op_args=[])
DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
for index in range(int(DynamicWorkflow_Group2)):
dynamicTask = PythonOperator(
task_id='secondGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['secondGroup', index])
bridge1_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge2_task)
ending_task = PythonOperator(
task_id='end',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])
DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))
for index in range(int(DynamicWorkflow_Group3)):
# You can make this logic anything you'd like
# I chose to use the PythonOperator for all tasks
# except the last task will use the BashOperator
if index < (int(DynamicWorkflow_Group3) - 1):
dynamicTask = PythonOperator(
task_id='thirdGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['thirdGroup', index])
else:
dynamicTask = BashOperator(
task_id='thirdGroup_' + str(index),
bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
dag=dag)
bridge2_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)
# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)
Run Code Online (Sandbox Code Playgroud)
在运行DAG之前,请创建以下三个气流变量
airflow variables --set DynamicWorkflow_Group1 1
airflow variables --set DynamicWorkflow_Group2 0
airflow variables --set DynamicWorkflow_Group3 0
Run Code Online (Sandbox Code Playgroud)
您会看到DAG与此不同
运行后对此
您可以在我的有关在气流上创建动态工作流的文章中看到有关此DAG的更多信息。
| 归档时间: |
|
| 查看次数: |
15298 次 |
| 最近记录: |