相关疑难解决方法(0)

气流以编程方式取消暂停?

我有一个dag,我们将部署到多个不同的气流实例和我们的airflow.cfg,dags_are_paused_at_creation = True但对于这个特定的dag,我们希望它打开,而不必通过单击UI手动执行.有没有办法以编程方式进行?

airflow apache-airflow airflow-scheduler

9
推荐指数
3
解决办法
4768
查看次数

如何在使用Airflow实现的工作流中等待DAG任务中的异步事件?

我使用Airflow实现的工作流包含任务A,B,C和D.我希望工作流在任务C等待事件.在Airflow中,传感器用于通过轮询某个状态来检查某些条件,如果该条件为真,则会触发工作流中的下一个任务.我的要求是避免投票.这里有一个答案提到了一个rest_api_plugin的气流,它创建了rest_api端点来触发气流CLI - 使用这个插件,我可以在工作流程中触发任务.但是,在我的工作流程中,我希望实现一个等待休息API调用(异步事件)而不进行轮询的任务,一旦收到其余的API请求,任务就会被触发并恢复Airflow工作流程.

避免轮询的原因:效率低下,不按照我们的要求进行扩展.

更新

我按照@Daniel Huang的回答中提到的建议,创建了一个返回False的传感器.这个传感器在task:start_evaluating_cycle中实现,现在这个传感器任务没有检测到任何东西,但总是返回False:

class WaitForEventSensor(BaseSensorOperator):

    def poke(self, context):
        return False

start_evaluating_cycle = WaitForEventSensor(
    task_id="start_evaluating_cycle",
    dag=dag,
    poke_interval=60*60 # any number will do here, because it not polling just returning false
)
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

我配置了rest_api_plugin并使用插件我试图标记任务:start_evaluating_cyle完成以继续工作流程.

  • 在此输入图像描述

rest_api_plugin成功执行任务,我可以看到任务是使用flower运行的:

  • 在此输入图像描述

但在工作流程中,任务:start_evaluating_cycle仍处于运行状态:

  • 项目清单

rest_api_plugin正在运行独立于工作流的任务.如何让rest_api_plugin在工作流程内运行任务 - 而不是独立于工作流程.

但是,当我从气流UI管理员中选择任务并标记成功时:

  • 项目清单

我需要这个网址:http:// localhost:8080/admin/airflow/success?task_id = start_evaluating_cycle&dag_id = faculty_evaluation_workflow&upstream = false&downstream = false&future = false&past = false&execution_date = 2017-11-26T06:48:54.297289&origin = http%3A% 2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8

当我确认时,工作流程会进一步发展 - 这就是我想要的,但我需要在其余的API调用中标记成功.

我担心的是:

  1. 如何使用
    rest_api_plugin 将工作流内运行的任务标记为成功?
  2. 是否可以使用气流管理员创建的URL,通过从外部系统调用任务来将任务标记为成功?

airflow apache-airflow

9
推荐指数
1
解决办法
2603
查看次数

如何在Airflow中实施轮询?

我想使用Airflow实施数据流,这些数据流定期轮询外部系统(ftp服务器等),检查是否符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是Airflow的新手,并且读到Sensor是在这种情况下要使用的东西,实际上我设法编写了一个在运行“ airflow test”时可以正常工作的传感器。但是我对于传感器的poke_interval和DAG调度的关系有些困惑。如何为用例定义这些设置?还是应该使用其他方法?我只希望Airflow在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时使仪表板充满故障。

airflow apache-airflow

0
推荐指数
1
解决办法
1213
查看次数