我仍在部署过程中Airflow,我已经感觉到需要将 operators合并在一起。最常见的用例是将运算符和相应的sensor. 例如,人们可能希望将EmrStepOperator和链接在一起EmrStepSensor。
我正在DAG以编程方式创建我的s ,其中最大的一个包含 150 多个(相同的)分支,每个分支对不同的数据位(表)执行相同的一系列操作。因此,将构成DAG 中单个逻辑步骤的任务放在一起会很有帮助。
以下是我项目中的 2 个相互竞争的例子,为我的论点提供动力。
1.从S3路径删除数据然后写入新数据
此步骤包括 2 个操作员
DeleteS3PathOperator: 扩展自BaseOperator& 使用S3HookHadoopDistcpOperator: 从 SSHOperator2. 有条件地MSCK REPAIR在Hive桌子上表演
这一步包含4个操作符
BranchPythonOperator: 检查 Hive 表是否分区MsckRepairOperator:从扩展HiveOperator和执行MSCK上(REPAIR进行分配)表Dummy(Branch)Operator:使向上交替的分支路径,以MsckRepairOperator(对于非分区表)Dummy(Join)Operator: 组成两个分支的连接步骤使用运营商的隔离无疑提供了更小的模块,更细粒度的日志/调试,但在大的DAG,减少杂波可能是可取的。根据我目前的理解,有两种方法可以将操作符链接在一起
Hook秒
在钩子中编写实际的处理逻辑,然后在单个运算符中使用任意数量的钩子(在我看来当然是更好的方法)
SubDagOperator
我的问题是
更新-1
3. 多重继承
虽然这是一个Python特性而不是Airflow特定的,但值得指出的是,多重继承在组合运算符的功能时可以派上用场。QuboleCheckOperator,例如,已经使用它编写。然而,在过去,我曾尝试用这件事来融合EmrCreateJobFlowOperatorand EmrJobFlowSensor,但当时我遇到了装饰器的问题并放弃了这个想法。@apply_defaults
我根据我的需要组合了各种钩子来创建单个运算符。一个简单的例子是我在钩子中结合了 gcs delete、copy、list 方法和 get_size 方法来创建一个名为 的操作符GcsDataValidationOperator。经验法则是具有幂等性,即如果运行多次,它应该产生相同的结果。
运算符到底应该组合还是采用离散步骤更好?
唯一的陷阱是可维护性,有时当主分支中的钩子发生变化时,如果有任何重大更改,您将需要手动更新所有操作员。
上述方法有什么陷阱或改进吗?
您可以使用PythonOperator内置的钩子 with.execute方法,但这仍然意味着 DAG 文件中的大量细节。因此,我仍然会选择新的操作方法
还有其他方法可以将运算符组合在一起吗?
Hooks 只是与外部平台和数据库(如 Hive、GCS 等)的接口,并构成操作员的构建块。这允许创建新的运算符。此外,这意味着您可以自定义模板化字段,在新操作员内的每个细粒度步骤上添加松弛通知,并拥有自己的日志记录详细信息。
在Airflow的分类中,Hooks的主要动机与上面相同,还是还有其他目的?
FWIW:我是 PMC 成员,也是 Airflow 项目的贡献者。
| 归档时间: |
|
| 查看次数: |
1496 次 |
| 最近记录: |