Airflow docker 命令通过 xCom 进行通信

Dim*_*iev 1 python docker airflow

我有一个 Airflow docker 容器和另外两个容器(dc1 和 dc2)。我正在尝试在任务 1 的 dc1 中执行命令(通过 DockerOperator),其输出将在任务 2 中的 dc2 命令中使用。

我有一个可行的解决方案,但不幸的是,它并不强大:(

我正在阅读 dc1 日志,99% 都有效

命令1.py

# a simple version of the real script 
print({'date': '2020-05-03'})
Run Code Online (Sandbox Code Playgroud)

气流/dags/dag1.py

# a wrapper class 
class DOperator(DockerOperator):
    def __init__(self, task_id, command, dag, *args, **kwargs):
        super().__init__(
            image='docker_image:latest',
            task_id=task_id,
            command=command,
            api_version='auto',
            auto_remove=True,
            docker_url='unix://var/run/docker.sock',
            network_mode='bridge',
            tty=True,
            xcom_push=True,
            dag=dag,
            *args,
            **kwargs
        )

    def execute(self, context):
        # get the last log line from docker stdout
        docker_log = super().execute(context)

        # push XComs from the json
        if docker_log:
            try:
                result = json.loads(docker_log)

                for key in result.keys():
                    context['ti'].xcom_push(key=key, value=result[key])
            except:
                pass

        return docker_log

# Dcocker container 1
task1 = DOperator(
    dag=dag,
    task_id='task1',
    command='python comand1.py',  # its output is '2020-05-03'
)

# Dcocker container 2
task2 = DOperator(
    task_id='task2',
    command='python comand2.py --date={}'.format(
        "{{{{ task_instance.xcom_pull(dag_id='{}', task_ids='{}', key='{}') }}}}".format(
            dag.dag_id,
            task1.task_id,
            'date'
        )
)

task1 >> task2
Run Code Online (Sandbox Code Playgroud)

dc1 日志

[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - {"date": "2020-05-03"}
Run Code Online (Sandbox Code Playgroud)

但也有 1% 的人不这样做

在这种情况下,dc1 日志包含一个空的额外行,我无法正确提取输出

dc1 日志

[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - {"date": "2020-05-03"}
[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - 
Run Code Online (Sandbox Code Playgroud)

所以我的问题是:

  • 你知道如何解决这个问题吗
  • 或者您知道两个 docker 操作员之间进行通信的更好方法吗?

Mat*_*amp 6

以下内容应该适用于您正在做的事情。我的解决方案使用 XCOM 流量在容器之间传递数据。它确实需要在 dockerized 任务之间执行一个任务,以便可以提取 XCOM。以下是我用来向其他人展示如何将 XCOM 流量传入和传出容器化 Airflow 任务的示例 dag。我知道它适用于 Airflow 1.10.6。我刚刚升级到1.10.12,并且遇到了一些XCOM_JSON未传递到容器的问题。希望这会有所帮助......如果您有疑问,请提出,我会尽力详细说明。

使用 Docker 的 DAG 示例

example_docker_dag.py

import time
import logging
import datetime
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, Variable, XCom
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
import ast


logger = logging.getLogger(__name__)

args = {
        'start_date': days_ago(1),
        'ownder': 'mjcamp'

        }

dag = DAG(
        dag_id ='example-docker-dag',
        default_args=args,
        max_active_runs=6,
        schedule_interval='@daily'
        )

def make_xcom_traffic(**context):
    print("some task that does things..")
    time.sleep(2)
    data = dict(test="Pass some data to the next task or tasks.", number=545)
    Variable.set("Example-XCom", data)
    return data # conventional way of doing XCOM ... don't have to really return anything but for uniforimity sake


def read_xcom(**context):
    print("Read XCOM from DockerOperator.. and maybe do things..")
    time.sleep(1)
    # Grab xcom from previous task(dockerized task)..
    data = context['task_instance'].xcom_pull(task_ids=context['task'].upstream_task_ids, key='return_value')
    # Airflow seems to be broken and will return all 'stdoout' as the XCOM traffic so we have to parse it or
    # write our code to only `print` the serialized thing we want.. in this case we are just printing a directionary.
    # if you do something like this and have string data in your return value.. either don't use
    # new lines for your stirngs or choose a different way to break things..

    xcom_data = data[0].split('\n')[-1]
    print("THIS IS WHAT I WANT:", xcom_data)
    xcom_data = ast.literal_eval(xcom_data)
    # Showing we have a python dictionary now...
    print("Int =>", xcom_data["Int"])
    print("Str =>", xcom_data["Str"])
    print("Float =>", xcom_data["Float"])


t1 = PythonOperator(
        task_id='make-xcom-traffic',
        provide_context=True,
        python_callable=make_xcom_traffic,
        dag=dag)

t2 = DockerOperator(
        environment={
            "Example-XCom" : Variable.get("Example-XCom"),
            },
        task_id="docker-text",
        image="example-docker-task",
        auto_remove=True,
        xcom_push=True,
        xcom_all=False, # <<<=== things are broken in Airflow and this doesn't do what you expect.. it does nothing
        docker_url='unix:///var/run/docker.sock',
        # docker_conn_id='containeryard',
        api_version='auto',
        dag=dag
        )

t3 = PythonOperator(
        task_id='var-test',
        provide_context=True,
        python_callable=read_xcom,
        dag=dag
        )

t1 >> t2 >> t3
Run Code Online (Sandbox Code Playgroud)

容器化任务

Dockerfile

FROM python:3.6

ENV EXAMPLE_XCOM=not-set

COPY ./requirements.txt /tmp/requirements.txt

RUN pip install -r /tmp/requirements.txt

COPY ./code.py /tmp/code.py

CMD ["/usr/local/bin/python", "/tmp/code.py"]
Run Code Online (Sandbox Code Playgroud)

需求.txt

pandas==0.24.2
numpy==1.17.1
Run Code Online (Sandbox Code Playgroud)

代码.py

FROM python:3.6

ENV EXAMPLE_XCOM=not-set

COPY ./requirements.txt /tmp/requirements.txt

RUN pip install -r /tmp/requirements.txt

COPY ./code.py /tmp/code.py

CMD ["/usr/local/bin/python", "/tmp/code.py"]
Run Code Online (Sandbox Code Playgroud)

气流可变

Key: Example-XCom
Value: {'test': 'Pass some data to the next task or tasks.', 'number': 545}
Run Code Online (Sandbox Code Playgroud)