我有一个 Airflow docker 容器和另外两个容器(dc1 和 dc2)。我正在尝试在任务 1 的 dc1 中执行命令(通过 DockerOperator),其输出将在任务 2 中的 dc2 命令中使用。
我有一个可行的解决方案,但不幸的是,它并不强大:(
我正在阅读 dc1 日志,99% 都有效
命令1.py
# a simple version of the real script
print({'date': '2020-05-03'})
Run Code Online (Sandbox Code Playgroud)
气流/dags/dag1.py
# a wrapper class
class DOperator(DockerOperator):
def __init__(self, task_id, command, dag, *args, **kwargs):
super().__init__(
image='docker_image:latest',
task_id=task_id,
command=command,
api_version='auto',
auto_remove=True,
docker_url='unix://var/run/docker.sock',
network_mode='bridge',
tty=True,
xcom_push=True,
dag=dag,
*args,
**kwargs
)
def execute(self, context):
# get the last log line from docker stdout
docker_log = super().execute(context)
# push XComs from the json …Run Code Online (Sandbox Code Playgroud)