以 DAG 方式调度作业

Joa*_*him 5 python linux cron scheduled-tasks airflow

我们有一个包含不同类型工作的系统。例如,让我们称它们为:

job_1
job_2
job_3
Run Code Online (Sandbox Code Playgroud)

它们都需要不同的参数集(和可选参数)。即我们job_1(x)为不同的x= A, B, C ...job_2运行一组参数,这些参数取决于结果job_1(x)job_2加载job_A(x)存储的数据。等等。

结果是依赖关系的树结构。现在,这些工作偶尔会因某种原因而失败。因此,如果job_Aforx=B失败,该树的分支将完全失败并且不应运行。所有其他分支都应该运行。

所有作业都用 Python 编写并使用并行性(基于产生 SLURM 作业)。它们是用 cron 安排的。这显然不是很好,有两个主要缺点:

  • 调试起来非常困难。无论树中较高的作业是否失败,所有作业都会运行。如果不深入了解依赖关系,就很难看出问题出在哪里。
  • 如果更高的作业(例如job_A)未完成,则job_B可能会计划运行,并且会失败或基于过时的日期运行。

为了解决这个问题,我们正在研究用于调度或可视化的气流,因为它是用 Python 编写的,它似乎大致符合我们的需求。不过,我看到了不同的挑战:

  • 作业的依赖关系树要么是很一般(即job_B取决于job_A)或非常宽(即job_B(y)对100个参数依赖job_A(x=A)。在第一种情况下的可视化树将有大约10片叶子,但会使得调试非常困难,因为这项工作可能只是对于某个参数失败了。后一种情况下的可视化树会很宽,大约有 300 个叶子。它会更准确,但可视化可能很难阅读。我们可以过滤失败的作业,看看它们的依赖关系吗?
  • 我们在作业中有并行性(我们需要它,否则作业运行超过一天,我们想每天重新运行整个批次)这是否会破坏我们的日程安排?
  • 我们希望尽可能少地改变我们的工作和数据管理。
  • 我们能否以一种易于理解的方式实施关于接下来产生哪些工作的规则系统?

气流是一个不错的选择吗?我知道还有其他一些(luigi、Azkaban 等)与 Hadoop 堆栈有些相关(我们没有使用它,因为它不是大数据)。需要多少黑客攻击?多少黑客行为是明智的?

Cha*_*ley 2

我不能真正代表气流,但我可以代表路易吉。

luigi 的简要概述:Luigi 是为数据流和数据依赖性而设计的,就像 Airflow 一样,但它是在 Spotify 开发的。数据流中的每个步骤都表示为一个继承自 luigi.Task 的类,我们将每个步骤称为任务。每个任务都由三个主要函数组成,并且还具有参数声明。三个函数及其说明:

  1. 需要:在此函数中,您可以通过返回这些任务来指定当前任务所依赖的任务。
  2. 输出:在此函数中,您可以通过返回 Luigi.LocalTarget 类(或类似但用于远程)的对象来指定保存此任务结果的位置。
  3. 运行:在此函数中,您指定任务运行时实际发生的情况。

注意:luigi 中央调度程序通过检查文件是否存在来了解任务何时完成 - 特别是在要运行的任务的 require 函数中返回的任务的输出函数中指定的文件。

我们可以过滤失败的作业,并只查看它们的依赖关系吗?

Luigi 记录传递给每个任务的所有参数以及运行每个任务的每次尝试。默认情况下,luigi 不保存日志,但您可以轻松设置。去年夏天我做了一个大的路易吉管道,我用它保存了日志。然后,它使用模糊字符串比较(使用 Levenshtein 库)来删除重复行并严重压缩日志,然后基本上搜索单词“错误”,如果出现,那么它会向我发送一封电子邮件,其中包含压缩后的内容登录它。

我们在作业中具有并行性(我们需要它,否则作业会运行超过一天,并且我们希望每天重新运行整个作业),这会扰乱我们的调度吗?

我运行其中并行的任务没有任何问题。但有些库可能会导致问题,例如 gensim。

我们希望尽可能少地改变我们的工作和数据管理。

通常,您可以将大部分计算粘贴到 luigi 任务的运行函数中。

我们能否以一种易于理解的方式实现下一步产生哪些工作的规则系统?

我相信是的,是的。对于每个任务,您可以在任务的 require 函数中指定它所依赖的任务。

其他需要考虑的事情是文档。Luigi 的文档相当不错,但还没有那么流行。Luigi 的社区不是很大,也不是非常活跃。据我所知,Airflow 相当可比,但它较新,因此目前可能有一个更活跃的社区。

是 luigi 的作者写的一篇博客文章,其中对 luigi 和较新的替代方案进行了一些简短的比较。他的结论是:它们都很糟糕。包括路易吉。