Ple*_*eea 10 django orm django-models airflow apache-airflow
如何在Airflow任务中使用Django模型?
根据官方的Airflow文档,Airflow提供了与数据库(如MySqlHook/PostgresHook/etc)交互的钩子,这些钩子稍后可以在运算符中用于行查询执行.附加核心代码片段:
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
Run Code Online (Sandbox Code Playgroud)
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
Run Code Online (Sandbox Code Playgroud)
我们可以看到Hook封装了连接配置,而Operator提供了执行自定义查询的能力.
由于以下原因,使用不同的ORM来获取和处理数据库对象而不是原始SQL非常方便:
出于某种原因,在钩子和运算符方面没有在Airflow任务中使用ORM的示例.根据Django之外的Django数据库层使用?问题是,需要为数据库设置连接配置,然后在ORM中直接执行queires,但在适当的钩子/运算符之外执行此操作会破坏Airflow 原则.这就像用"python work_with_django_models.py"命令调用BashOperator .
那么这种情况下最好的实践是什么?我们是否共享Django ORM /其他ORM的钩子/操作符?为了使下面的代码真实(视为伪代码!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
Run Code Online (Sandbox Code Playgroud)
而不是在原始SQL中实现此功能.
我认为这是非常重要的话题,因为在这种情况下,基于ORM的框架和流程的整个流程无法潜入Airflow.
提前致谢!
Rya*_*ack 12
我同意我们应该继续讨论,因为访问Django ORM可以显着降低解决方案的复杂性.
我的方法是1)创建一个DjangoOperator
import os, sys
from airflow.models import BaseOperator
def setup_django_for_airflow():
# Add Django project root to path
sys.path.append('./project_root/')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
import django
django.setup()
class DjangoOperator(BaseOperator):
def pre_execute(self, *args, **kwargs):
setup_django_for_airflow()
Run Code Online (Sandbox Code Playgroud)
2)为逻辑/运算符扩展DjangoOperator,从访问ORM中获益
from .base import DjangoOperator
class DjangoExampleOperator(DjangoOperator):
def execute(self, context):
from myApp.models import model
model.objects.get_or_create()
Run Code Online (Sandbox Code Playgroud)
通过此策略,您可以区分使用Raw SQL/ORM的运算符.另请注意,对于Django运算符,所有django模型导入都需要在执行上下文中,如上所示.
归档时间: |
|
查看次数: |
2239 次 |
最近记录: |