小编Chr*_*eck的帖子

启动 Airflow 网络服务器失败并显示 sqlalchemy.exc.NoInspectionAvailable:没有可用的检查系统

安装正确。db 正确启动并尝试启动网络服务器显示以下错误。

我重新安装了所有东西,但它仍然无法正常工作。

如果有人帮助我,我将不胜感激。

控制台输出:

$:~/airflow# airflow webserver -p 8080
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-04-08 13:14:20,573] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-04-08 13:14:20,574] {dagbag.py:403} INFO - Filling up the DagBag from /home/cato_service/airflow/dags
Traceback (most recent call last):
  File "/usr/local/bin/airflow", …
Run Code Online (Sandbox Code Playgroud)

airflow

38
推荐指数
2
解决办法
6880
查看次数

在运行时通过气流导出环境变量

我目前正在将之前在bash脚本中实现的工作流转换为Airflow DAG。在bash脚本中,我只是在运行时使用以下命令导出变量

export HADOOP_CONF_DIR="/etc/hadoop/conf"
Run Code Online (Sandbox Code Playgroud)

现在我想在Airflow中做同样的事情,但是还没有找到解决方案。我发现的一种解决方法是在os.environ[VAR_NAME]='some_text'任何方法或运算符外部设置变量,但这意味着在脚本加载后而不是在运行时将它们导出。

现在,当我尝试调用os.environ[VAR_NAME] = 'some_text'由PythonOperator 调用的函数时,它不起作用。我的代码看起来像这样

def set_env():
    os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['PATH'] = "somePath:" + os.environ['PATH']
    os.environ['SPARK_HOME'] = "pathToSparkHome"
    os.environ['PYTHONPATH'] = "somePythonPath"
    os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
    os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()

set_env_operator = PythonOperator(
    task_id='set_env_vars_NOT_WORKING',
    python_callable=set_env,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

现在,当我的SparkSubmitOperator被执行时,我得到了异常:

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
Run Code Online (Sandbox Code Playgroud)

我使用的情况下,这是相关的是,我有SparkSubmitOperator,我在那里作业提交到纱,因此无论是HADOOP_CONF_DIRYARN_CONF_DIR必须在环境中进行设置。.bashrc对我来说,在我或其他任何配置中设置它们都是遗憾的,这就是为什么我需要在运行时设置它们。

最好在执行之前SparkSubmitOperator,先在操作员中设置它们,但如果有可能将它们作为参数传递给SparkSubmitOperator,那至少是可以的。

python environment-variables apache-spark airflow

6
推荐指数
1
解决办法
2482
查看次数

具有 RBAC 功能的 Airflow LDAP 身份验证

我正在尝试使用 RBAC 功能启用 Airflow LDAP 身份验证,并进行了以下更改:

  1. 从airflow.cfg 中删除了LDAP 部分
  2. 修改airflow.cfg:在[webserver]部分下 添加rbac = true和删除authentication = True
  3. AIRFLOW_HOME目录下 创建 webserver_config.py 文件

webserver_config.py 文件包含:

import os

from airflow import configuration as conf

from flask_appbuilder.security.manager import AUTH_LDAP

basedir = os.path.abspath(os.path.dirname(__file__))

SQLALCHEMY_DATABASE_URI = conf.get(‘core’, ‘SQL_ALCHEMY_CONN’)

CSRF_ENABLED = True

AUTH_TYPE = AUTH_LDAP

AUTH_ROLE_ADMIN = ‘Admin’

AUTH_USER_REGISTRATION = True

AUTH_USER_REGISTRATION_ROLE = “Admin”

AUTH_LDAP_SERVER = ‘ldaps://ldap.xxx.yyy.net:636‘

AUTH_LDAP_SEARCH = “ou=Users,o=corp”

AUTH_LDAP_BIND_USER = ‘cn=ldap-proxy,ou=Users,o=corp’

AUTH_LDAP_BIND_PASSWORD = ‘YOUR_PASSWORD’

AUTH_LDAP_UID_FIELD = ‘uid’

AUTH_LDAP_USE_TLS = False

AUTH_LDAP_ALLOW_SELF_SIGNED = …
Run Code Online (Sandbox Code Playgroud)

ldap rbac airflow flask-appbuilder

6
推荐指数
1
解决办法
1686
查看次数

BranchPythonOperator 之后的 Airflow 任务不会失败并正确成功

在我的 DAG 中,我有一些只应在周六运行的任务。因此,我使用 BranchPythonOperator 在星期六的任务和 DummyTask 之间进行分支。之后,我加入两个分支并想要运行其他任务。

工作流程如下所示: 在此输入图像描述
这里我将dummy3的触发规则设置为'one_success',一切正常。

我遇到的问题是当 BranchPythonOperator 的上游出现故障时: 在此输入图像描述
BranchPythonOperator 和分支正确地具有状态'upstream_failed',但是连接分支的任务变为'skipped',因此显示了整个工作流程'success'

我尝试使用'all_success'作为触发规则,那么如果出现故障,整个工作流程都会失败,它会正常工作,但如果没有出现故障,则会跳过 dummy3。

我也尝试过'all_done'作为触发规则,如果没有失败,它会正常工作,但如果失败则 dummy3 仍然会被执行。

我的测试代码如下所示:

from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('test_branches',
          description='Test branches',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 1))


def python1():
    raise Exception('Test failure')
    # print 'Test success'


dummy1 = PythonOperator(
    task_id='python1',
    python_callable=python1,
    dag=dag
) …
Run Code Online (Sandbox Code Playgroud)

python airflow

4
推荐指数
1
解决办法
9130
查看次数