我有一个由三个运算符组成的简单DAG。第一个是PythonOperator我们自己的功能,另外两个是标准的运营商从airflow.contrib(FileToGoogleCloudStorageOperator并GoogleCloudStorageToBigQueryOperator要准确)。它们按顺序工作。根据参数,我们的自定义任务会生成许多文件,通常在2到5之间。所有这些文件都必须由后续任务分别处理。这意味着我想要几个下游分支,但在运行DAG之前确切知道有多少个分支。
您将如何解决这个问题?
更新:
使用BranchPythonOperator他在另一个答复中提到的jhnclvr 作为出发点,我创建了一个根据条件跳过或继续执行分支的运算符。该方法仅是可行的,因为已知最大可能的分支数并且足够小。
运营商:
class SkipOperator(PythonOperator):
def execute(self, context):
boolean = super(SkipOperator, self).execute(context)
session = settings.Session()
for task in context['task'].downstream_list:
if boolean is False:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
ti.state = State.SKIPPED
ti.start_date = datetime.now()
ti.end_date = datetime.now()
session.merge(ti)
session.commit()
session.close()
Run Code Online (Sandbox Code Playgroud)
用法:
def check(i, templates_dict=None, **kwargs):
return len(templates_dict["data_list"].split(",")) > i
dag = DAG(
dag_name,
default_args=default_args,
schedule_interval=None
)
load = CustomOperator(
task_id="load_op",
bash_command=' '.join([
'./command.sh'
'--data-list {{ dag_run.conf["data_list"]|join(",") }}'
]),
dag=dag
)
for i in range(0, 5):
condition = SkipOperator(
task_id=f"{dag_name}_condition_{i}",
python_callable=partial(check, i),
provide_context=True,
templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
dag=dag
)
gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i
load_to_gcs = CustomFileToGoogleCloudStorageOperator(
task_id=f"{dag_name}_to_gs_{i}",
src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
bucket=gs_bucket,
dst=gs_filename,
mime_type='application/json',
google_cloud_storage_conn_id=connection_id,
dag=dag
)
load_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id=f"{dag_name}_to_bq_{i}",
bucket=gs_bucket,
source_objects=[gs_filename, ],
source_format='NEWLINE_DELIMITED_JSON',
destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
bigquery_conn_id=connection_id,
schema_fields={},
google_cloud_storage_conn_id=connection_id,
write_disposition='WRITE_TRUNCATE',
dag=dag
)
condition.set_upstream(load)
load_to_gcs.set_upstream(condition)
load_to_bq.set_upstream(load_to_gcs)
Run Code Online (Sandbox Code Playgroud)
基本上,您无法在任务DAG运行时为其添加任务。您需要提前知道要添加多少个任务。
您可以使用一个运算符处理N个文件。
或者,如果您还有另一个处理文件的dag,则可以触发该DAGN次,在conf中传递文件名。
请参阅此处的示例TriggerDagRunOperator。
| 归档时间: |
|
| 查看次数: |
1665 次 |
| 最近记录: |