我有一个由三个运算符组成的简单DAG。第一个是PythonOperator我们自己的功能,另外两个是标准的运营商从airflow.contrib(FileToGoogleCloudStorageOperator并GoogleCloudStorageToBigQueryOperator要准确)。它们按顺序工作。根据参数,我们的自定义任务会生成许多文件,通常在2到5之间。所有这些文件都必须由后续任务分别处理。这意味着我想要几个下游分支,但在运行DAG之前确切知道有多少个分支。
您将如何解决这个问题?
更新:
使用BranchPythonOperator他在另一个答复中提到的jhnclvr 作为出发点,我创建了一个根据条件跳过或继续执行分支的运算符。该方法仅是可行的,因为已知最大可能的分支数并且足够小。
运营商:
class SkipOperator(PythonOperator):
def execute(self, context):
boolean = super(SkipOperator, self).execute(context)
session = settings.Session()
for task in context['task'].downstream_list:
if boolean is False:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
ti.state = State.SKIPPED
ti.start_date = datetime.now()
ti.end_date = datetime.now()
session.merge(ti)
session.commit()
session.close()
Run Code Online (Sandbox Code Playgroud)
用法:
def check(i, templates_dict=None, **kwargs):
return len(templates_dict["data_list"].split(",")) > i
dag = DAG(
dag_name,
default_args=default_args,
schedule_interval=None
)
load = CustomOperator(
task_id="load_op",
bash_command=' '.join([
'./command.sh'
'--data-list {{ …Run Code Online (Sandbox Code Playgroud) airflow ×1