导出所有气流变量

Mad*_*ess 1 python airflow

我在从代码下载所有 Airflow 变量时遇到问题。
有机会从 UI 导出,但我还没有找到任何以编程方式进行导出的方法。

我发现了唯一Variable.get('variable_name')返回一个 Airflow 变量的方法。没有获取气流变量列表的变体。

在源代码中搜索也没有帮助。你知道一些简单的方法吗?

先感谢您。

kax*_*xil 10

您可以使用 Airflow CLI 将变量导出到文件,然后从 Python 代码中读取它。

airflow variables --export FILEPATH
Run Code Online (Sandbox Code Playgroud)

以编程方式,您可以使用BashOperator来实现此目的。


小智 8

我喜欢上面关于使用 Airflow CLI 的答案,但也可以从纯粹的 python 角度提取所有变量(所以不需要做奇怪的技巧来获取它)

使用这个代码片段:

from airflow.utils.db import create_session
from airflow.models import Variable

# a db.Session object is used to run queries against
# the create_session() method will create (yield) a session
with create_session() as session:
    # By calling .query() with Variable, we are asking the airflow db 
    # session to return all variables (select * from variables).
    # The result of this is an iterable item similar to a dict but with a 
    # slightly different signature (object.key, object.val).
    airflow_vars = {var.key: var.val for var in session.query(Variable)}
Run Code Online (Sandbox Code Playgroud)

上述方法将查询Airflow sql数据库并返回所有变量。使用简单的字典理解将允许您将返回值重新映射到“正常”字典。

如果无法连接到正在运行的 Airflow 数据库实例,则会db.session.query引发。sqlalchemy.exc.OperationalError

如果您(无论出于何种原因)希望将 create_session 模拟为单元测试的一部分,可以使用以下代码片段:

from unittest import TestCase
from unittest.mock import patch, MagicMock
import contextlib
import json

mock_data = {
    "foo": {
        "bar": "baz"
    }
}

airflow_vars = ...  # reference to an output (dict) of aforementioned method


class TestAirflowVariables(TestCase)
    @contextlib.contextmanager
    def create_session(self):
        """Helper that mocks airflow.settings.Session().query() result signature
        This is achieved by yielding a mocked airflow.settings.Session() object
        """
        session = MagicMock()
        session.query.return_value = [
            # for the purpose of this test mock_data is converted to json where 
            # dicts are encountered. 
            # You will have to modify the above method to parse data from airflow
            # correctly (it will send json objects, not dicts)
            MagicMock(key=k, val=json.dumps(v) if isinstance(v, dict) else v)
            for k, v in mock_data.items()
        ]
        yield session

    @patch("airflow.utils.db")
    def test_data_is_correctly_parsed(self, db):
        db.create_session = self.create_session
        self.assertDictEqual(airflow_vars, mock_data)
Run Code Online (Sandbox Code Playgroud)

注意:您必须将补丁更改为create_session在引用的文件中导入方法的方式。我只有通过导入直到airflow.utils.db并调用db.create_session上述方法才能使其工作。

希望这可以帮助!祝你好运 :)