无法安装 apache airflow 的附加要求

J.C*_*man 18 python docker docker-compose airflow

我正在使用以下 docker-compose 图像,我从以下位置获取此图像: https: //github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml

version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
  environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on: &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test:
        [
          "CMD-SHELL",
          'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
        ]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: "true"
      _AIRFLOW_WWW_USER_CREATE: "true"
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  ######################################################
  # SPARK SERVICES
  ######################################################

  jupyterlab:
    image: andreper/jupyterlab:3.0.0-spark-3.0.0
    container_name: jupyterlab
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: andreper/spark-master:3.0.0
    container_name: spark-master
    ports:
      - 8081:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8083:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

volumes:
  postgres-db-volume:
  shared-workspace:
    name: "jordi_airflow"
    driver: local
    driver_opts:
      type: "none"
      o: "bind"
      device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"
Run Code Online (Sandbox Code Playgroud)

我正在尝试运行以下 DAG:

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator

from datetime import datetime, timedelta
import csv
import requests
import json

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def printar():
    print("success!")


with DAG(
    "forex_data_pipeline",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    downloading_rates = PythonOperator(task_id="test1", python_callable=printar)

    forex_processing = SparkSubmitOperator(
        task_id="spark1",
        application="/opt/airflow/dags/test.py",
        conn_id="spark_conn",
        verbose=False,
    )

    downloading_rates  >> forex_processing
Run Code Online (Sandbox Code Playgroud)

但我在气流用户界面中看到此错误:

Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'
Run Code Online (Sandbox Code Playgroud)

我已指定在 docker-compose 文件中安装附加要求:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
Run Code Online (Sandbox Code Playgroud)

我写错了?我应该如何指定要在气流中安装的附加要求?我可以传递requirements.txt吗?如果是这样,我如何指定路径?

小智 26

我用的是\xef\xbc\x9a

\n
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- package1 package2 package3 }\n\n# Example\n_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}\n\n# or:\n_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql}\n
Run Code Online (Sandbox Code Playgroud)\n

(注:加1个空格)

\n

然后当airflow docker第一次运行Python时,它会自动pip安装所需的包。

\n

[参考]

\n\n


mik*_*laj 11

对环境变量的支持_PIP_ADDITIONAL_REQUIREMENTS尚未发布。仅 docker 镜像的开发者/未发布版本支持它。计划在 Airflow 2.1.1 中提供此功能。有关更多信息,请参阅:添加 PROD 映像的构建和运行时的额外要求。

对于旧版本,您应该构建一个新映像并将该映像设置在docker-compose.yaml. 为此,您需要执行几个步骤。

  1. 创建一个新的Dockerfile内容,内容如下:
    FROM apache/airflow:2.0.0
    RUN pip install --no-cache-dir apache-airflow-providers
    
    Run Code Online (Sandbox Code Playgroud)
  2. 构建新图像:
    FROM apache/airflow:2.0.0
    RUN pip install --no-cache-dir apache-airflow-providers
    
    Run Code Online (Sandbox Code Playgroud)
  3. 在文件中设置此图像docker-compose.yaml
    docker build . --tag my-company-airflow:2.0.0
    
    Run Code Online (Sandbox Code Playgroud)

更多信息请参阅: 关于在 docker-compose 环境中运行 Airflow 的官方指南

我特别推荐这个片段,它描述了当您需要安装新的 pip 包时要做什么。

ModuleNotFoundError:没有名为“XYZ”的模块

Docker Compose 文件使用最新的 Airflow 映像 (apache/airflow)。如果您需要安装新的Python库或系统库,您可以对其进行自定义和扩展。

我建议您查看有关构建 Docker Image的指南。这解释了如何安装更复杂的依赖项。

我还建议仅使用官方网站上针对特定版本的 Docker-compose 文件。新版本中的 Docker-compose 文件可能无法与旧版本的 Airflow 一起使用,因为我们一直在对这些文件进行许多改进,以提高稳定性、可靠性和用户体验。