在 Airflow 中实现等待特定标准的有效方法

sha*_*rky 7 python airflow airflow-scheduler

Airflow 中的传感器- 是某种类型的操作符,会一直运行直到满足某个标准,但它们会占用一个完整的工作槽。好奇人们是否能够可靠地使用更有效的方法来实现这一点。

我的一些想法

  • 使用池来限制分配给传感器的工作槽数量
  • 跳过下游的所有任务,然后通过外部触发器清除和恢复
  • 暂停 DAG 的运行并通过外部触发器再次恢复

其他相关链接:

gor*_*ros 6

新版本的 Airflow,即 1.10.2 为传感器提供了新的选项,我认为这解决了您的担忧:

mode (str) – 传感器的工作方式。选项是:{ poke | 重新安排},默认是戳。当设置为 poke 时,传感器在整个执行时间内占用一个工作槽,并在 poke 之间休眠。如果传感器的预期运行时间较短或需要较短的戳间隔时间,请使用此模式。当设置为重新调度时,传感器任务会在尚未满足条件时释放工作槽,并在稍后重新调度。如果满足条件的预期时间为,则使用此模式。poke 间隔应该超过一分钟,以防止调度程序负载过多。

这是文档的链接


Eri*_*and 5

我认为您需要退后一步并质疑为什么传感器占用一个完整的工作槽是一个问题。

Airflow 是一个调度器,而不是一个资源分配器。使用工作线程并发、池和队列,您可以限制资源使用,但只是非常粗略。最后,Airflow 天真地假设传感器将在工作节点上使用与 BashOperator 相同的资源,后者产生多进程基因组测序实用程序。但是传感器很便宜并且 99.9% 的时间都在休眠,所以这是一个糟糕的假设。

因此,如果您想解决传感器占用所有工作器插槽的问题,只需提高工作器的并发性。您应该能够在一个 worker 上同时运行数百个传感器。

如果随后在集群节点和具有危险高系统负载的节点上遇到工作负载分布非常不均匀的问题,您可以使用以下任一方法来限制昂贵的作业数量:

  • 昂贵的作业必须消耗的池(将启动作业并等待池资源可用)。这会创建集群范围的限制。
  • 每个节点上的特殊工作人员只接受昂贵的工作(使用airflow worker --queues my_expensive_queue)并且具有低并发设置。这会创建每个节点的限制。

如果您有比这更复杂的要求,那么考虑将所有非平凡的计算作业传送到专用资源分配器,例如Apache Mesos,您可以在其中指定确切的 CPU、内存和其他要求,以确保更有效地分配集群负载在每个节点上,Airflow 都无法做到。