安装正确。db 正确启动并尝试启动网络服务器显示以下错误。
我重新安装了所有东西,但它仍然无法正常工作。
如果有人帮助我,我将不胜感激。
控制台输出:
$:~/airflow# airflow webserver -p 8080
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2020-04-08 13:14:20,573] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-04-08 13:14:20,574] {dagbag.py:403} INFO - Filling up the DagBag from /home/cato_service/airflow/dags
Traceback (most recent call last):
File "/usr/local/bin/airflow", …Run Code Online (Sandbox Code Playgroud) 我目前正在将之前在bash脚本中实现的工作流转换为Airflow DAG。在bash脚本中,我只是在运行时使用以下命令导出变量
export HADOOP_CONF_DIR="/etc/hadoop/conf"
Run Code Online (Sandbox Code Playgroud)
现在我想在Airflow中做同样的事情,但是还没有找到解决方案。我发现的一种解决方法是在os.environ[VAR_NAME]='some_text'任何方法或运算符外部设置变量,但这意味着在脚本加载后而不是在运行时将它们导出。
现在,当我尝试调用os.environ[VAR_NAME] = 'some_text'由PythonOperator 调用的函数时,它不起作用。我的代码看起来像这样
def set_env():
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
os.environ['PATH'] = "somePath:" + os.environ['PATH']
os.environ['SPARK_HOME'] = "pathToSparkHome"
os.environ['PYTHONPATH'] = "somePythonPath"
os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()
set_env_operator = PythonOperator(
task_id='set_env_vars_NOT_WORKING',
python_callable=set_env,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在,当我的SparkSubmitOperator被执行时,我得到了异常:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
Run Code Online (Sandbox Code Playgroud)
我使用的情况下,这是相关的是,我有SparkSubmitOperator,我在那里作业提交到纱,因此无论是HADOOP_CONF_DIR或YARN_CONF_DIR必须在环境中进行设置。.bashrc对我来说,在我或其他任何配置中设置它们都是遗憾的,这就是为什么我需要在运行时设置它们。
最好在执行之前SparkSubmitOperator,先在操作员中设置它们,但如果有可能将它们作为参数传递给SparkSubmitOperator,那至少是可以的。
我正在尝试使用 RBAC 功能启用 Airflow LDAP 身份验证,并进行了以下更改:
rbac = true和删除authentication = TrueAIRFLOW_HOME目录下 创建 webserver_config.py 文件webserver_config.py 文件包含:
import os
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_LDAP
basedir = os.path.abspath(os.path.dirname(__file__))
SQLALCHEMY_DATABASE_URI = conf.get(‘core’, ‘SQL_ALCHEMY_CONN’)
CSRF_ENABLED = True
AUTH_TYPE = AUTH_LDAP
AUTH_ROLE_ADMIN = ‘Admin’
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = “Admin”
AUTH_LDAP_SERVER = ‘ldaps://ldap.xxx.yyy.net:636‘
AUTH_LDAP_SEARCH = “ou=Users,o=corp”
AUTH_LDAP_BIND_USER = ‘cn=ldap-proxy,ou=Users,o=corp’
AUTH_LDAP_BIND_PASSWORD = ‘YOUR_PASSWORD’
AUTH_LDAP_UID_FIELD = ‘uid’
AUTH_LDAP_USE_TLS = False
AUTH_LDAP_ALLOW_SELF_SIGNED = …Run Code Online (Sandbox Code Playgroud) 在我的 DAG 中,我有一些只应在周六运行的任务。因此,我使用 BranchPythonOperator 在星期六的任务和 DummyTask 之间进行分支。之后,我加入两个分支并想要运行其他任务。
工作流程如下所示:

这里我将dummy3的触发规则设置为'one_success',一切正常。
我遇到的问题是当 BranchPythonOperator 的上游出现故障时:

BranchPythonOperator 和分支正确地具有状态'upstream_failed',但是连接分支的任务变为'skipped',因此显示了整个工作流程'success'。
我尝试使用'all_success'作为触发规则,那么如果出现故障,整个工作流程都会失败,它会正常工作,但如果没有出现故障,则会跳过 dummy3。
我也尝试过'all_done'作为触发规则,如果没有失败,它会正常工作,但如果失败则 dummy3 仍然会被执行。
我的测试代码如下所示:
from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
dag = DAG('test_branches',
description='Test branches',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 1))
def python1():
raise Exception('Test failure')
# print 'Test success'
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
) …Run Code Online (Sandbox Code Playgroud)