Che*_*n J 9 python airflow apache-airflow
我需要任务的状态,如果它在同一个dag中运行或更新或失败.所以我试图使用下面的代码得到它,虽然我没有输出...
Auto = PythonOperator(
task_id='test_sleep',
python_callable=execute_on_emr,
op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
dag=dag)
logger.info(Auto)
Run Code Online (Sandbox Code Playgroud)
一旦气流上的特定任务完成,目的是杀死某些正在运行的任务.
问题是我如何获得任务的状态,例如它处于运行状态还是失败或成功
我在做类似的事情。我需要检查一项任务是否成功完成了另一项任务的前10次。taky2使我走上了正确的道路。实际上很简单:
from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()
Run Code Online (Sandbox Code Playgroud)
由于我想检查dag中的内容,因此不必指定dag。我只是创建了一个函数来遍历过去的n_days并检查状态。
def check_status(**kwargs):
last_n_days = 10
for n in range(0,last_n_days):
date = kwargs['execution_date']- timedelta(n)
ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before')
state = ti.current_state()
if state != 'success':
raise ValueError('Not all previous tasks successfully completed.')
Run Code Online (Sandbox Code Playgroud)
调用该函数时,请确保设置provide_context。
check_success_task = PythonOperator(
task_id='check_success_days_before',
python_callable= check_status,
provide_context=True,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
更新:当您想从另一个dag调用任务时,您需要这样调用它:
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[*my_dag_id*]
my_task = check_dag.get_task(*my_task_id*)
ti = TaskInstance(my_task, date)
Run Code Online (Sandbox Code Playgroud)
看一下 Priyank 建议的负责命令行界面操作的代码。
def task_state(args):
dag = get_dag(args)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
print(ti.current_state())
Run Code Online (Sandbox Code Playgroud)
因此,您似乎应该能够使用类似的代码在 DAG 代码库中轻松完成此操作。
或者,您可以使用 pythonsubprocess库从代码中执行这些 CLI 操作。
好吧,我想我知道你在做什么,我真的不同意,但我会从一个答案开始.
一种直截了当但又带有hackish的方法是查询task_instance表.我在postgres,但结构应该是相同的.首先通过db调用获取您感兴趣的task_ids和任务状态.
SELECT task_id, state
FROM task_instance
WHERE dag_id = '<dag_id_attrib>'
AND execution_date = '<execution_date_attrib>'
AND task_id = '<task_to_check>'
Run Code Online (Sandbox Code Playgroud)
这应该为您提供您要监控的任务的状态(以及名称,以供参考).状态存储为简单的小写字符串.
您可以使用命令行界面来执行此操作:
airflow task_state [-h] [-sd SUBDIR] dag_id task_id execution_date
Run Code Online (Sandbox Code Playgroud)
有关更多信息,您可以参考官方气流文档:
http://airflow.incubator.apache.org/cli.html