J.C*_*man 18 python docker docker-compose airflow
我正在使用以下 docker-compose 图像,我从以下位置获取此图像: https: //github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on: &airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test:
[
"CMD-SHELL",
'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: "true"
_AIRFLOW_WWW_USER_CREATE: "true"
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
######################################################
# SPARK SERVICES
######################################################
jupyterlab:
image: andreper/jupyterlab:3.0.0-spark-3.0.0
container_name: jupyterlab
ports:
- 8888:8888
- 4040:4040
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: andreper/spark-master:3.0.0
container_name: spark-master
ports:
- 8081:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8083:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
volumes:
postgres-db-volume:
shared-workspace:
name: "jordi_airflow"
driver: local
driver_opts:
type: "none"
o: "bind"
device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"
Run Code Online (Sandbox Code Playgroud)
我正在尝试运行以下 DAG:
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import csv
import requests
import json
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "admin@localhost.com",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def printar():
print("success!")
with DAG(
"forex_data_pipeline",
start_date=datetime(2021, 1, 1),
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
downloading_rates = PythonOperator(task_id="test1", python_callable=printar)
forex_processing = SparkSubmitOperator(
task_id="spark1",
application="/opt/airflow/dags/test.py",
conn_id="spark_conn",
verbose=False,
)
downloading_rates >> forex_processing
Run Code Online (Sandbox Code Playgroud)
但我在气流用户界面中看到此错误:
Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'
Run Code Online (Sandbox Code Playgroud)
我已指定在 docker-compose 文件中安装附加要求:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
Run Code Online (Sandbox Code Playgroud)
我写错了?我应该如何指定要在气流中安装的附加要求?我可以传递requirements.txt吗?如果是这样,我如何指定路径?
小智 26
我用的是\xef\xbc\x9a
\n_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- package1 package2 package3 }\n\n# Example\n_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}\n\n# or:\n_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql}\n
Run Code Online (Sandbox Code Playgroud)\n(注:加1个空格)
\n然后当airflow docker第一次运行Python时,它会自动pip安装所需的包。
\nlxml==4.6.3 charset-normalizer==1.4.1
。在 Airflow 图像 2.1.1 及更高版本中可用。mik*_*laj 11
对环境变量的支持_PIP_ADDITIONAL_REQUIREMENTS
尚未发布。仅 docker 镜像的开发者/未发布版本支持它。计划在 Airflow 2.1.1 中提供此功能。有关更多信息,请参阅:添加 PROD 映像的构建和运行时的额外要求。
对于旧版本,您应该构建一个新映像并将该映像设置在docker-compose.yaml
. 为此,您需要执行几个步骤。
Dockerfile
内容,内容如下:
FROM apache/airflow:2.0.0
RUN pip install --no-cache-dir apache-airflow-providers
Run Code Online (Sandbox Code Playgroud)
FROM apache/airflow:2.0.0
RUN pip install --no-cache-dir apache-airflow-providers
Run Code Online (Sandbox Code Playgroud)
docker-compose.yaml
:
docker build . --tag my-company-airflow:2.0.0
Run Code Online (Sandbox Code Playgroud)
更多信息请参阅: 关于在 docker-compose 环境中运行 Airflow 的官方指南
我特别推荐这个片段,它描述了当您需要安装新的 pip 包时要做什么。
ModuleNotFoundError:没有名为“XYZ”的模块
Docker Compose 文件使用最新的 Airflow 映像 (apache/airflow)。如果您需要安装新的Python库或系统库,您可以对其进行自定义和扩展。
我建议您查看有关构建 Docker Image的指南。这解释了如何安装更复杂的依赖项。
我还建议仅使用官方网站上针对特定版本的 Docker-compose 文件。新版本中的 Docker-compose 文件可能无法与旧版本的 Airflow 一起使用,因为我们一直在对这些文件进行许多改进,以提高稳定性、可靠性和用户体验。
归档时间: |
|
查看次数: |
22807 次 |
最近记录: |