kax*_*xil 10
您可以使用 Airflow CLI 将变量导出到文件,然后从 Python 代码中读取它。
airflow variables --export FILEPATH
Run Code Online (Sandbox Code Playgroud)
以编程方式,您可以使用BashOperator来实现此目的。
小智 8
我喜欢上面关于使用 Airflow CLI 的答案,但也可以从纯粹的 python 角度提取所有变量(所以不需要做奇怪的技巧来获取它)
使用这个代码片段:
from airflow.utils.db import create_session
from airflow.models import Variable
# a db.Session object is used to run queries against
# the create_session() method will create (yield) a session
with create_session() as session:
# By calling .query() with Variable, we are asking the airflow db
# session to return all variables (select * from variables).
# The result of this is an iterable item similar to a dict but with a
# slightly different signature (object.key, object.val).
airflow_vars = {var.key: var.val for var in session.query(Variable)}
Run Code Online (Sandbox Code Playgroud)
上述方法将查询Airflow sql数据库并返回所有变量。使用简单的字典理解将允许您将返回值重新映射到“正常”字典。
如果无法连接到正在运行的 Airflow 数据库实例,则会db.session.query引发。sqlalchemy.exc.OperationalError
如果您(无论出于何种原因)希望将 create_session 模拟为单元测试的一部分,可以使用以下代码片段:
from unittest import TestCase
from unittest.mock import patch, MagicMock
import contextlib
import json
mock_data = {
"foo": {
"bar": "baz"
}
}
airflow_vars = ... # reference to an output (dict) of aforementioned method
class TestAirflowVariables(TestCase)
@contextlib.contextmanager
def create_session(self):
"""Helper that mocks airflow.settings.Session().query() result signature
This is achieved by yielding a mocked airflow.settings.Session() object
"""
session = MagicMock()
session.query.return_value = [
# for the purpose of this test mock_data is converted to json where
# dicts are encountered.
# You will have to modify the above method to parse data from airflow
# correctly (it will send json objects, not dicts)
MagicMock(key=k, val=json.dumps(v) if isinstance(v, dict) else v)
for k, v in mock_data.items()
]
yield session
@patch("airflow.utils.db")
def test_data_is_correctly_parsed(self, db):
db.create_session = self.create_session
self.assertDictEqual(airflow_vars, mock_data)
Run Code Online (Sandbox Code Playgroud)
注意:您必须将补丁更改为create_session在引用的文件中导入方法的方式。我只有通过导入直到airflow.utils.db并调用db.create_session上述方法才能使其工作。
希望这可以帮助!祝你好运 :)
| 归档时间: |
|
| 查看次数: |
3090 次 |
| 最近记录: |