DAG在Airflow中完成运行后如何删除XCOM对象

vij*_*hna 7 airflow apache-airflow airflow-scheduler

我在XCOM中有一个巨大的json文件,稍后dag执行完成后就不需要了,但是我仍然在UI中看到包含所有数据的Xcom对象,一旦DAG运行,是否可以通过编程方式删除XCOM完成。

谢谢

小智 12

您可以通过 sqlalchemy 以编程方式执行清理,以便在数据库结构发生更改时您的解决方案不会中断:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()
Run Code Online (Sandbox Code Playgroud)

您还可以清除旧的 XCom 数据:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()
Run Code Online (Sandbox Code Playgroud)

如果您想在 dag 完成后清除 XCom,我认为最干净的解决方案是使用 DAG 模型类的“on_success_callback”属性:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)
Run Code Online (Sandbox Code Playgroud)


Oma*_*r14 10

您必须根据您的 metadatadb(sqllite、PostgreSql、MySql ..)添加一个任务,该任务在 DAG 运行完成后删除 XCOM。

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)
Run Code Online (Sandbox Code Playgroud)

您可以在运行 dag 之前验证您的查询。

数据分析 -> Ad Hoc 查询 ->airflow_db -> 查询 -> 运行!

xcom元数据