Airflow DockerOperator 卷和挂载

Bar*_*art 3 docker airflow dockeroperator

我们有 Airflow 运行(使用 Docker compose),并且有多个 DAG 处于活动状态。上周我们将 Airflow 更新至版本 2.1.3。这导致我们使用 DockerOperator 的 DAG 出现错误:

airflow.exceptions.AirflowException: Invalid arguments were passed to DockerOperator (task_id: t_docker). Invalid arguments were:
**kwargs: {'volumes':
Run Code Online (Sandbox Code Playgroud)

我发现这个发行说明告诉我

airflow.providers.docker.operators.docker.DockerOperator和airflow.providers.docker.operators.docker_swarm.DockerSwarmOperator中的volumes参数被mounts参数替换

所以我将 DAG 更改为

t_docker = DockerOperator(
        task_id='t_docker',
        image='customimage:latest',
        container_name='custom_1',
        api_version='auto',
        auto_remove=True,
        volumes=['/home/airflow/scripts:/opt/airflow/scripts','/home/airflow/data:/opt/airflow/data'],
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

对此

t_docker = DockerOperator(
        task_id='t_docker',
        image='customimage:latest',
        container_name='custom_1',
        api_version='auto',
        auto_remove=True,
        mounts=['/home/airflow/scripts:/opt/airflow/scripts','/home/airflow/data:/opt/airflow/data'],
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

但现在我得到这个错误:

docker.errors.APIError: 500 Server Error for http+docker://localhost/v1.41/containers/create?name=custom_1: Internal Server Error ("json: cannot unmarshal string into Go struct field HostConfig.HostConfig.Mounts of type mount.Mount")
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?

Ela*_*lad 8

更改不仅在于参数名称,还在于Mount 语法的更改。

你应该更换

volumes=['/home/airflow/scripts:/opt/airflow/scripts','/home/airflow/data:/opt/airflow/data']
Run Code Online (Sandbox Code Playgroud)

和:

mounts=[
    Mount(source="/home/airflow/scripts", target="/opt/airflow/scripts", type="bind"),
    Mount(source="/home/airflow/data", target="/opt/airflow/data", type="bind"),
]
Run Code Online (Sandbox Code Playgroud)

所以你的代码将是:

from docker.types import Mount
t_docker = DockerOperator(
    task_id='t_docker',
    image='customimage:latest',
    container_name='custom_1',
    api_version='auto',
    auto_remove=True,
    mounts=[
        Mount(source="/home/airflow/scripts", target="/opt/airflow/scripts", type="bind"),
        Mount(source="/home/airflow/data", target="/opt/airflow/data", type="bind"),
    ],
    docker_url='unix://var/run/docker.sock',
    network_mode='bridge',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)