我有一个dag,我们将部署到多个不同的气流实例和我们的airflow.cfg,dags_are_paused_at_creation = True但对于这个特定的dag,我们希望它打开,而不必通过单击UI手动执行.有没有办法以编程方式进行?
我使用Airflow实现的工作流包含任务A,B,C和D.我希望工作流在任务C等待事件.在Airflow中,传感器用于通过轮询某个状态来检查某些条件,如果该条件为真,则会触发工作流中的下一个任务.我的要求是避免投票.这里有一个答案提到了一个rest_api_plugin的气流,它创建了rest_api端点来触发气流CLI - 使用这个插件,我可以在工作流程中触发任务.但是,在我的工作流程中,我希望实现一个等待休息API调用(异步事件)而不进行轮询的任务,一旦收到其余的API请求,任务就会被触发并恢复Airflow工作流程.
避免轮询的原因:效率低下,不按照我们的要求进行扩展.
更新
我按照@Daniel Huang的回答中提到的建议,创建了一个返回False的传感器.这个传感器在task:start_evaluating_cycle中实现,现在这个传感器任务没有检测到任何东西,但总是返回False:
class WaitForEventSensor(BaseSensorOperator):
def poke(self, context):
return False
start_evaluating_cycle = WaitForEventSensor(
task_id="start_evaluating_cycle",
dag=dag,
poke_interval=60*60 # any number will do here, because it not polling just returning false
)
Run Code Online (Sandbox Code Playgroud)
我配置了rest_api_plugin并使用插件我试图标记任务:start_evaluating_cyle完成以继续工作流程.
rest_api_plugin成功执行任务,我可以看到任务是使用flower运行的:
但在工作流程中,任务:start_evaluating_cycle仍处于运行状态:
rest_api_plugin正在运行独立于工作流的任务.如何让rest_api_plugin在工作流程内运行任务 - 而不是独立于工作流程.
但是,当我从气流UI管理员中选择任务并标记成功时:
当我确认时,工作流程会进一步发展 - 这就是我想要的,但我需要在其余的API调用中标记成功.
我担心的是:
我想使用Airflow实施数据流,这些数据流定期轮询外部系统(ftp服务器等),检查是否符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是Airflow的新手,并且读到Sensor是在这种情况下要使用的东西,实际上我设法编写了一个在运行“ airflow test”时可以正常工作的传感器。但是我对于传感器的poke_interval和DAG调度的关系有些困惑。如何为用例定义这些设置?还是应该使用其他方法?我只希望Airflow在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时使仪表板充满故障。