(Django)气流中的ORM - 有可能吗?

Ple*_*eea 10 django orm django-models airflow apache-airflow

如何在Airflow任务中使用Django模型?

根据官方的Airflow文档,Airflow提供了与数据库(如MySqlHook/PostgresHook/etc)交互的钩子,这些钩子稍后可以在运算符中用于行查询执行.附加核心代码片段:

https://airflow.apache.org/_modules/mysql_hook.html复制

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = {
            "user": conn.login,
            "passwd": conn.password or ''
        }
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn
Run Code Online (Sandbox Code Playgroud)

https://airflow.apache.org/_modules/mysql_operator.html复制

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info('Executing: ' + str(self.sql))
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        hook.run(
            self.sql,
            autocommit=self.autocommit,
            parameters=self.parameters)
Run Code Online (Sandbox Code Playgroud)

我们可以看到Hook封装了连接配置,而Operator提供了执行自定义查询的能力.

问题:

由于以下原因,使用不同的ORM来获取和处理数据库对象而不是原始SQL非常方便:

  1. 在简单的情况下,ORM可以是一个更方便的解决方案,请参阅ORM定义.
  2. 假设已经建立了像Django这样的已定义模型及其方法的系统.每次这些模型的模式发生变化时,都需要重写气流原始SQL查询.ORM提供了一个统一的界面来处理这些模型.

出于某种原因,在钩子和运算符方面没有在Airflow任务中使用ORM的示例.根据Django之外的Django数据库层使用?问题是,需要为数据库设置连接配置,然后在ORM中直接执行queires,但在适当的钩子/运算符之外执行此操作会破坏Airflow 原则.这就像用"python work_with_django_models.py"命令调用BashOperator .

最后,我们想要这个:

那么这种情况下最好的实践是什么?我们是否共享Django ORM /其他ORM的钩子/操作符?为了使下面的代码真实(视为伪代码!):

import os
import django
os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    "myapp.settings"
)
django.setup()
from your_app import models

def get_and_modify_models(ds, **kwargs):
    all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
    all_objects[15].my_int_field = 25
    all_objects[15].save()
    return list(all_objects)

django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
Run Code Online (Sandbox Code Playgroud)

而不是在原始SQL中实现此功能.

我认为这是非常重要的话题,因为在这种情况下,基于ORM的框架和流程的整个流程无法潜入Airflow.

提前致谢!

Rya*_*ack 12

我同意我们应该继续讨论,因为访问Django ORM可以显着降低解决方案的复杂性.

我的方法是1)创建一个DjangoOperator

import os, sys

from airflow.models import BaseOperator


def setup_django_for_airflow():
    # Add Django project root to path
    sys.path.append('./project_root/')

    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

    import django
    django.setup()


class DjangoOperator(BaseOperator):

    def pre_execute(self, *args, **kwargs):
        setup_django_for_airflow()
Run Code Online (Sandbox Code Playgroud)

2)为逻辑/运算符扩展DjangoOperator,从访问ORM中获益

from .base import DjangoOperator


class DjangoExampleOperator(DjangoOperator):

    def execute(self, context):
        from myApp.models import model
        model.objects.get_or_create()
Run Code Online (Sandbox Code Playgroud)

通过此策略,您可以区分使用Raw SQL/ORM的运算符.另请注意,对于Django运算符,所有django模型导入都需要在执行上下文中,如上所示.