如何在 docker 镜像上执行完美的 Flow?

New*_*ler 7 etl docker dask docker-image prefect

我的目标:

我有一个构建的 docker 映像,并希望在该映像上运行我的所有流程。

现在:

我有以下任务正在本地 Dask 执行器上运行。运行代理的服务器与执行所需的环境是不同的 python 环境my_task- 因此需要在预构建映像中运行。

我的问题是: 如何在 Dask Executor 上运行此流程,以便它在我提供的 docker 映像(作为环境)上运行?

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment


@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Docker!")


with Flow("My Flow") as flow:
    results = hello_task()

flow.environment = LocalEnvironment(
    labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)
Run Code Online (Sandbox Code Playgroud)

我认为我需要首先在该 docker 映像上启动服务器和代理(如此处所述,但我想可以有一种方法可以简单地在提供的映像上运行 Flow。

更新1

按照教程,我尝试了以下操作:

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment
from prefect.environments.storage import Docker


@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Docker!")


with Flow("My Flow") as flow:
    results = hello_task()

flow.storage = Docker(registry_url='registry.gitlab.com/my-repo/image-library')
flow.environment = LocalEnvironment(
    labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)

flow.register(project_name="testing")
Run Code Online (Sandbox Code Playgroud)

但这创建了一个图像,然后将其上传到registry_url提供的位置。之后,当我尝试运行注册的任务时,它拉取了新创建的映像,并且该任务Submitted for execution现在停留在状态几分钟。

我不明白为什么它推了一个图像然后又拉了它?相反,我已经在此注册表上构建了一个映像,我想指定一个应用于任务执行的映像。

New*_*ler 3

我最终实现这一目标的方法如下:

  1. prefect server start在服务器上运行(即不在 docker 内部)。显然 docker-compose 在 docker 中不是一个好主意。
  2. prefect agent start在 docker 镜像内运行
  3. 确保 docker 镜像可以访问这些流(例如,通过在镜像和服务器之间安装共享卷)

你可以在这里查看我的答案的来源。