损坏的DAG:[/ airflow / dags / a.py]无法为登录名解密`extra`参数=无,缺少FERNET_KEY配置

jac*_*ack 5 python airflow

我在没有FERNET_KEY的情况下启动了Airflow。意识到这一点后,我将执行以下操作:https : //airflow.apache.org/configuration.html#connections

pip install apache-airflow[crypto]

from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key)
Run Code Online (Sandbox Code Playgroud)

拿了钥匙并将其放入airflow.cfg然后调用airflow initdb,但是错误仍然出现。

我究竟做错了什么?

当我做:

airflow webserver -D
Run Code Online (Sandbox Code Playgroud)

我得到:

  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 713, in extra_dejson
    if self.extra:
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/attributes.py", line 293, in __get__
    return self.descriptor.__get__(instance, owner)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 632, in get_extra
    return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
  File "/usr/lib/python2.7/dist-packages/cryptography/fernet.py", line 101, in decrypt
    raise InvalidToken
Run Code Online (Sandbox Code Playgroud)

日志表明此代码存在问题:

def get_conn(conn_id, session=None):
    conn = (session.query(Connection)
                   .filter(Connection.conn_id == conn_id)
                   .first())
    return conn


def my_python_function():
   conn = get_conn('s3connection')
   key_id = conn.extra_dejson.get('aws_access_key_id')
   secret_key = conn.extra_dejson.get('aws_secret_access_key')
   default_region = conn.extra_dejson.get('region_name')
   return key_id,secret_key,default_region
Run Code Online (Sandbox Code Playgroud)

dla*_*lin 6

气流通常会为您生成一个。

下面是一个例子:

$ python
>>> from cryptography.fernet import Fernet
>>> k=Fernet.generate_key()
>>> print(k)
Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0=
>>> ^D
$ $EDITOR $AIRFLOW_HOME/airflow.cfg
Run Code Online (Sandbox Code Playgroud)

有变化:

# Secret key to save connection passwords in the db
fernet_key = cryptography_not_found_storing_passwords_in_plain_text
Run Code Online (Sandbox Code Playgroud)

到:

# Secret key to save connection passwords in the db
fernet_key = Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0=
Run Code Online (Sandbox Code Playgroud)

检查它是否按预期设置(或者每次都会随机生成一个)

$ python
Python 2.7.13 (default, Jul 18 2017, 09:17:00)
[GCC 4.2.1 Compatible Apple LLVM 8.1.0 (clang-802.0.42)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow import configuration as conf
[2018-06-14 17:53:36,200] {__init__.py:57} INFO - Using executor SequentialExecutor
>>> conf.get('core','fernet_key')
'Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0='
>>>
Run Code Online (Sandbox Code Playgroud)

上面应该是 v1.9.0 & v1.8.2 语法 [fixed],我已经用后者仔细检查过了。

每当您更改 fernet 密钥时,您都需要删除所有使用加密的连接和变量,因为它们将不再解密。

你可以重置你的数据库,但这可能是过度了。


小智 6

Airflow 使用 Fernet 加密后端数据库中其连接的所有密码。

在您的情况下,Airflow 后端正在使用以前的 fernet 密钥,并且您已经生成了一个用于创建新连接的密钥。

我的建议是做以下第一件事,

airflow resetdb
Run Code Online (Sandbox Code Playgroud)

这将有助于删除后端数据库中的所有现有记录。

然后,

airflow initdb
Run Code Online (Sandbox Code Playgroud)

这将像新鲜一样初始化后端。

然后启动气流网络服务器和调度程序

airflow web server -p {port}
airflow scheduler
Run Code Online (Sandbox Code Playgroud)

然后在 UI 中创建 s3 的新连接(额外 - {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"})

现在您应该能够通过以下解决方案来测试 s3 文件观察器 - Airflow s3 connection using UI


cwu*_*rtz 0

如果您在没有 fernet 密钥的情况下启动气流,则不会允许加密任何连接。

如果您有 fernet 密钥,添加/编辑任何连接,它们将使用该密钥加密。如果您以任何方式更改密钥,这些连接将无法解密。如果您当前使用不同的 fernet 密钥加密连接,唯一的解决方案是将使用一种密钥创建的密钥迁移到另一种密钥。或者只是删除它们并重新创建它们,假设您仍然手头有这些值。