在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.
dag会触发工作吗?如果是这样,请你举个例子.
我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTaskSensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)
关于"动态任务"的其他问题似乎涉及在计划或设计时动态构建DAG.我有兴趣在执行期间动态地将任务添加到DAG.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('test_dag', description='a test',
schedule_interval='0 0 * * *',
start_date=datetime(2018, 1, 1),
catchup=False)
def make_tasks():
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
p = PythonOperator(
task_id='python_operator',
dag=dag,
python_callable=make_tasks)
Run Code Online (Sandbox Code Playgroud)
这种天真的实现似乎不起作用 - 虚拟任务永远不会出现在UI中.
在执行期间向DAG添加新运算符的正确方法是什么?可能吗?
我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。
- airflow_home
\- dags
\- workflow.py
Run Code Online (Sandbox Code Playgroud)
工作流.py
def get_task_list():
# ... query dynamodb ...
def run_task(task):
# ... do stuff ...
dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
t = PythonOperator(
task_id=task['id'],
provide_context=False,
dag=dag,
python_callable=run_task,
op_args=[task]
)
Run Code Online (Sandbox Code Playgroud)
问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。
我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:
- airflow_home
\- dags
\- workflow.py
\- mypackage
\- __init__
\- task.py
Run Code Online (Sandbox Code Playgroud)
但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。 …
我们有一个 kubernetes pod 操作符,它将输出一个 Python 字典,该字典将定义哪些更进一步的下游 kubernetes pod 操作符与它们的依赖项和环境变量一起运行,以传递给每个操作符。
我如何让这个 python 字典对象回到执行器的上下文(或者它是工作器的上下文?),以便气流可以产生下游 kubernetes 操作符?
我看过 BranchOperator 和 TriggerDagRunOperator 和 XCOM push/pull 和 Variable.get 和 Variable.set,但似乎没有什么工作。