vij*_*hna 7 airflow apache-airflow airflow-scheduler
我在XCOM中有一个巨大的json文件,稍后dag执行完成后就不需要了,但是我仍然在UI中看到包含所有数据的Xcom对象,一旦DAG运行,是否可以通过编程方式删除XCOM完成。
谢谢
小智 12
您可以通过 sqlalchemy 以编程方式执行清理,以便在数据库结构发生更改时您的解决方案不会中断:
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.dag_id == "your dag id").delete()
Run Code Online (Sandbox Code Playgroud)
您还可以清除旧的 XCom 数据:
from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()
Run Code Online (Sandbox Code Playgroud)
如果您想在 dag 完成后清除 XCom,我认为最干净的解决方案是使用 DAG 模型类的“on_success_callback”属性:
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context["ti"]["dag_id"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)
Run Code Online (Sandbox Code Playgroud)
Oma*_*r14 10
您必须根据您的 metadatadb(sqllite、PostgreSql、MySql ..)添加一个任务,该任务在 DAG 运行完成后删除 XCOM。
delete_xcom_task = PostgresOperator(
task_id='delete-xcom-task',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id=dag.dag_id and
task_id='your_task_id' and execution_date={{ ds }}",
dag=dag)
Run Code Online (Sandbox Code Playgroud)
您可以在运行 dag 之前验证您的查询。
数据分析 -> Ad Hoc 查询 ->airflow_db -> 查询 -> 运行!
| 归档时间: |
|
| 查看次数: |
2348 次 |
| 最近记录: |