小编Dim*_*iev的帖子

Airflow docker 命令通过 xCom 进行通信

我有一个 Airflow docker 容器和另外两个容器(dc1 和 dc2)。我正在尝试在任务 1 的 dc1 中执行命令(通过 DockerOperator),其输出将在任务 2 中的 dc2 命令中使用。

我有一个可行的解决方案,但不幸的是,它并不强大:(

我正在阅读 dc1 日志,99% 都有效

命令1.py

# a simple version of the real script 
print({'date': '2020-05-03'})
Run Code Online (Sandbox Code Playgroud)

气流/dags/dag1.py

# a wrapper class 
class DOperator(DockerOperator):
    def __init__(self, task_id, command, dag, *args, **kwargs):
        super().__init__(
            image='docker_image:latest',
            task_id=task_id,
            command=command,
            api_version='auto',
            auto_remove=True,
            docker_url='unix://var/run/docker.sock',
            network_mode='bridge',
            tty=True,
            xcom_push=True,
            dag=dag,
            *args,
            **kwargs
        )

    def execute(self, context):
        # get the last log line from docker stdout
        docker_log = super().execute(context)

        # push XComs from the json …
Run Code Online (Sandbox Code Playgroud)

python docker airflow

1
推荐指数
1
解决办法
3785
查看次数

标签 统计

airflow ×1

docker ×1

python ×1