我对 Docker + Airflow 很陌生。以下是我正在努力完成的工作。
我有 4 个服务,如下面的撰写文件所示。3 个与 Airflow 相关,一个作为测试 Ubuntu 实例。Airflow 相关容器:airflow-database, airflow-webserver,airflow-scheduler能够相互通信,我能够运行示例 DAG。现在我添加了第 4 个服务 (ubuntu),我尝试使用 DockerOperator 从 DAG 向其发送一个简单的命令“/bin/sleep 10”(下面是 DAG 文件)。但由于某种原因,我收到了 Permission Denied 消息(还附上了 DAG 错误文件)。
如果我从本地主机运行 Airflow 而不是从 docker 容器内部运行 Airflow,它会起作用无法弄清楚我缺少什么。以下是我尝试过的一些方法:
替换unix://var/run/docker.sock为tcp://172.20.0.1认为它可以通过 docker 主机 ip 解析
使用 gateway.host.internal
甚至从操作员中删除了 docker_url 选项,但意识到它无论如何都会默认为 unix://var/run/docker.sock
尝试了一堆组合,tcp://172.20.0.1:2376, tcp://172.20.0.1:2375
将主机端口映射到 Ubuntu,如 8085:8085 等。
docker version命令中发现了这一点,因此不得不使用 …我需要帮助将参数(xcom 从上一个任务推送)传递到 .sql 文件中的 SQL 查询。但是,我无法使用“参数”选项来执行此操作,即使此选项能够渲染上一个任务中的 xcom 值。让我知道我做错了什么。
谢谢 :)
start = EmptyOperator(
task_id="start",
)
fetch_cust_id = PythonOperator(
task_id = "fetch",
python_callable = lambda: 'C001',
)
update_orders = MySqlOperator(
task_id="update",
mysql_conn_id="mysql_default",
database="my_db",
sql="/update.sql",
parameters={
"custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
}
)
start >> fetch_cust_id >> update_orders
Run Code Online (Sandbox Code Playgroud)
SQL文件(update.sql):
UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
Run Code Online (Sandbox Code Playgroud)
:(