尝试查询 mssql 数据库时出现气流 Fernet_Key 问题

gcl*_*jr5 5 python docker airflow python-cryptography

我对 Airflow 很陌生。我已经多次通读文档,在网上阅读了许多 S/O 问题和许多随机文章,但尚未解决此问题。我有一种感觉,我做错了一些非常简单的事情。我有适用于 Windows 的 Docker,我拉取了puckel/docker-airflow映像并运行了一个暴露端口的容器,这样我就可以从我的主机访问 UI。我有另一个容器在运行mcr.microsoft.com/mssql/server,我在其中恢复了 WideWorldImporters 示例数据库。从 Airflow UI,我已经能够成功地创建到这个数据库的连接,甚至可以从数据分析部分查询它。检查下面的图像: 连接创建 成功查询到连接

因此,虽然这有效,但我的 dag 在第二个任务中失败了sqlData。这是代码:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime

copyData = DAG(
    dag_id='copyData',
    schedule_interval='@once',
    start_date=datetime(2019,1,1)
)


printHelloBash = BashOperator(
    task_id = "print_hello_Bash",
    bash_command = 'echo "Lets copy some data"',
    dag = copyData
)

mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
                       task_id="select_some_data",
                       mssql_conn_id=mssqlConnection,
                       database="WideWorldImporters",
                       dag = copyData,
                       depends_on_past=True
          )

queryDataSuccess = BashOperator(
    task_id = "confirm_data_queried",
    bash_command = 'echo "We queried data!"',
    dag = copyData
)

printHelloBash >> sqlData >> queryDataSuccess
Run Code Online (Sandbox Code Playgroud)

最初的错误是:

*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3  
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding  
Traceback (most recent call last):  
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
    _fernet = Fernet(fernet_key.encode('utf-8'))  
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
    key = base64.urlsafe_b64decode(key)  
  File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
    return b64decode(s)  
  File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
    return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*
Run Code Online (Sandbox Code Playgroud)

我注意到这与密码学有关,我继续运行pip install cryptographyand pip install airflow[crytpo],两者都返回完全相同的结果,告诉我要求已经得到满足。最后,我发现我只需要生成一个 fernet_key。我的airflow.cfg 文件中的默认键是fernet_key = $FERNET_KEY. 因此,我从容器中的 cli 运行:

python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
Run Code Online (Sandbox Code Playgroud)

并得到了我替换的代码$FERNET_KEY。我重新启动了容器并重新运行了 dag,现在我的错误是:

[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -   
Traceback (most recent call last):  
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
    h.verify(data[-32:])  
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
    ctx.verify(signature)  
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
    raise InvalidSignature("Signature did not match digest.")  
cryptography.exceptions.InvalidSignature: Signature did not match digest.
Run Code Online (Sandbox Code Playgroud)

从最初的加密文档扫描来看,哪个与兼容性有关?

我现在不知所措,决定问这个问题,看看我是否可能在解决这个问题时走错了路。任何帮助将不胜感激,因为 Airflow 看起来很棒。

gcl*_*jr5 3

感谢 @Tomasz 的一些侧面沟通,我终于让我的 DAG 开始工作了。他建议我尝试使用 docker-compose,它也在 puckel/docker-airflow github 存储库中列出。不过,我最终使用了 docker-compose-LocalExecutor.yml 文件而不是 Celery Executor。我还必须经历一些小的故障排除和更多配置。首先,我采用了现有的包含示例数据库的 MSSQL 容器,并使用docker commit mssql_container_name. 我这样做的唯一原因是为了节省恢复备份示例数据库的时间;如果需要,您可以随时将备份复制到容器中并在以后恢复它们。然后我将新映像添加到现有的 docker-compose-LocalExecutor.yml 文件中,如下所示:

version: '2.1'
services:
    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow

    mssql:
        image: dw:latest
        ports:
            - "1433:1433"

    webserver:
        image: puckel/docker-airflow:1.10.2
        restart: always
        depends_on:
            - postgres
            - mssql
        environment:
            - LOAD_EX=n
            - EXECUTOR=Local
        #volumes:
            #- ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3
Run Code Online (Sandbox Code Playgroud)

请注意,dw是我对基于 mssql 容器的新映像的命名。接下来,我将该文件重命名为docker-compose.yml,以便我可以轻松运行docker-compose up(不确定是否有命令直接指向不同的 YAML 文件)。一切启动并运行后,我导航到 Airflow UI 并配置我的连接。注意:由于您使用的是 docker-compose,因此您不需要知道其他容器的 IP 地址,因为它们使用我在此处找到的 DNS 服务 发现。然后,为了测试连接,我进入数据分析进行临时查询,但连接不存在。这是因为 puckel/docker-airflow 映像没有安装pymssql。因此,只需猛击容器docker exec -it airflow_webserver_container bash并安装它即可pip install pymssql --user。退出容器并使用 重新启动所有服务docker-compose restart。一分钟后,一切都启动并运行了。我的连接显示在即席查询中,我可以成功选择数据。最后,我打开了 DAG,调度程序接收到了它,一切都成功了!经过几周的谷歌搜索后,我松了一口气。感谢 @y2k-shubham 的帮助,并感谢 @Tomasz,在他在 r/datascience subreddit 上发表了有关 Airflow 的精彩而全面的文章后,我实际上最初联系了他。