如何向Airflow添加新的连接类型?

Nic*_*ick 5 airflow

看一下插件文档(https://airflow.incubator.apache.org/plugins.html),很清楚如何添加到新的钩子和操作符,但我添加了一个需要连接信息的新钩子.这些信息似乎是硬编码的airflow/models.py.有没有办法在不改变Airflow源代码的情况下将自己的连接类型添加到列表中?

Han*_* Ko 3

气流连接的 conn_type 字段允许 null 值。因此,如果您不关心为自定义挂钩提供唯一的类型名称,那么您可以在挂钩实现中提供默认连接值。

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.db import provide_session 

class MyHook(BaseHook):

    # ... impl whatever you want.        

    @classmethod
    @provide_session
    def get_hook(cls, conn_id='myhook_default', session=None):
        try:
            conn = cls.get_connection(conn_id)
        except AirflowException:  
            # create default connection. run only once.
            conn = Connection(
                conn_id=conn_id,
                # conn_type='string500',  # You can give new type string here. But no UI component's for you. Just leave it.
                host='default.example.com',
                port=80,
                login='default_login',
                password='default_pw',
                extra=json.dumps({...extra_defult_you_need...}),
            )
            session.add(conn)
            session.commit()
    return MyHook(conn=conn)
Run Code Online (Sandbox Code Playgroud)