将运算符融合在一起

y2k*_*ham 6 airflow

我仍在部署过程中Airflow,我已经感觉到需要 operators合并在一起。最常见的用例是运算符和相应的sensor. 例如,人们可能希望将EmrStepOperator和链接在一起EmrStepSensor


我正在DAG编程方式创建我的s ,其中最大的一个包含 150 多个(相同的)分支,每个分支对不同的数据位(表)执行相同的一系列操作。因此,将构成DAG 中单个逻辑步骤的任务放在一起会很有帮助。

以下是我项目中的 2 个相互竞争的例子,为我的论点提供动力。

1.从S3路径删除数据然后写入新数据

此步骤包括 2 个操作员

  • DeleteS3PathOperator: 扩展自BaseOperator& 使用S3Hook
  • HadoopDistcpOperator: 从 SSHOperator

2. 有条件地MSCK REPAIRHive桌子上表演

这一步包含4个操作符

  • BranchPythonOperator: 检查 Hive 表是否分区
  • MsckRepairOperator:从扩展HiveOperator和执行MSCK上(REPAIR进行分配)表
  • Dummy(Branch)Operator:使向上交替的分支路径,以MsckRepairOperator(对于非分区表)
  • Dummy(Join)Operator: 组成两个分支的连接步骤

使用运营商的隔离无疑提供了更小的模块,更细粒度的日志/调试,但在大的DAG,减少杂波可能是可取的。根据我目前的理解,有两种方法可以将操作符链接在一起

  1. Hook

    在钩子中编写实际的处理逻辑,然后在单个运算符中使用任意数量的钩子(在我看来当然是更好的方法)

  2. SubDagOperator

    一种有风险有争议的做事方式;此外,SubDagOperator 的命名约定让我皱眉


我的问题是

  • 运算符应该完全组合还是具有离散步骤更好?
  • 任何陷阱,上述方法的改进?
  • 还有其他方法可以运算符组合在一起吗?
  • 在Airflow 的分类中,Hooks 的主要动机是否与上述相同,或者它们是否也有其他用途?

更新-1

3. 多重继承

虽然这是一个Python特性而不是Airflow特定的,但值得指出的是,多重继承在组合运算符的功能时可以派上用场。QuboleCheckOperator,例如,已经使用它编写。然而,在过去,我曾尝试用这件事来融合EmrCreateJobFlowOperatorand EmrJobFlowSensor,但当时我遇到了装饰器的问题并放弃了这个想法。@apply_defaults

kax*_*xil 3

我根据我的需要组合了各种钩子来创建单个运算符。一个简单的例子是我在钩子中结合了 gcs delete、copy、list 方法和 get_size 方法来创建一个名为 的操作符GcsDataValidationOperator。经验法则是具有幂等性,即如果运行多次,它应该产生相同的结果。

运算符到底应该组合还是采用离散步骤更好?

唯一的陷阱是可维护性,有时当主分支中的钩子发生变化时,如果有任何重大更改,您将需要手动更新所有操作员。

上述方法有什么陷阱或改进吗?

您可以使用PythonOperator内置的钩子 with.execute方法,但这仍然意味着 DAG 文件中的大量细节。因此,我仍然会选择新的操作方法

还有其他方法可以将运算符组合在一起吗?

Hooks 只是与外部平台和数据库(如 Hive、GCS 等)的接口,并构成操作员的构建块。这允许创建新的运算符。此外,这意味着您可以自定义模板化字段,在新操作员内的每个细粒度步骤上添加松弛通知,并拥有自己的日志记录详细信息。

在Airflow的分类中,Hooks的主要动机与上面相同,还是还有其他目的?

FWIW:我是 PMC 成员,也是 Airflow 项目的贡献者。