小编Mrg*_*8m4的帖子

如何以Unix用户身份运行Apache Airflow DAG

我使用root帐户在群集上安装了Apache Airflow 。我知道这是不好的做法,但这只是测试环境。我创建了一个简单的DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

dag = DAG('create_directory', description='simple create directory workflow', start_date=datetime(2017, 6, 1))

t1 = BashOperator(task_id='create_dir', bash_command='mkdir /tmp/airflow_dir_test', dag=dag)

t2 = BashOperator(task_id='create_file', bash_command='echo airflow_works > /tmp/airflow_dir_test/airflow.txt')

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

问题是当我运行此作业时,root用户会执行它。我试图添加owner参数,但是它不起作用。气流说:

Broken DAG: [/opt/airflow/dags/create_directory.py] name 'user1' is not defined
Run Code Online (Sandbox Code Playgroud)

我的问题是,如何使用root用户以外的其他用户运行Apache Airflow DAG?

hadoop airflow apache-airflow

3
推荐指数
1
解决办法
3419
查看次数

如何添加天数(作为列的值)到目前为止?

我在Spark中添加天数(数字)到日期格式列时遇到问题.我知道有一个函数date_add需要两个参数 - 日期列和整数:

date_add(date startdate, tinyint/smallint/int days)
Run Code Online (Sandbox Code Playgroud)

我想使用整数类型的列值(不是整数本身).

说我有以下数据帧:

val data = Seq(
    (0, "2016-01-1"),
    (1, "2016-02-2"),
    (2, "2016-03-22"),
    (3, "2016-04-25"),
    (4, "2016-05-21"),
    (5, "2016-06-1"),
    (6, "2016-03-21"))
).toDF("id", "date")
Run Code Online (Sandbox Code Playgroud)

我可以简单地将整数添加到日期:

val date_add_fun = 
data.select(
    $"id",
    $"date",
    date_add($"date", 1)
)
Run Code Online (Sandbox Code Playgroud)

但我不能使用包含值的列表达式:

val date_add_fun = 
data.select(
    $"id",
    $"date",
    date_add($"date", $"id")
)
Run Code Online (Sandbox Code Playgroud)

它给出了错误:

<console>:60: error: type mismatch;
 found   : org.apache.spark.sql.ColumnName
 required: Int
           date_add($"date", $"id")
Run Code Online (Sandbox Code Playgroud)

有谁知道是否可以使用列是date_add函数?或者解决方法是什么?

scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
2631
查看次数

使用pytest测试Spark-无法在本地模式下运行Spark

我正在尝试从此站点使用pytest运行单词计数测试-使用py.test 对Apache Spark进行单元测试。问题是我无法启动spark上下文。我用于运行Spark Context的代码:

@pytest.fixture(scope="session")
def spark_context(request):
    """ fixture for creating a spark context
    Args:
        request: pytest.FixtureRequest object
    """
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
    sc = SparkContext(conf=conf)
    request.addfinalizer(lambda: sc.stop())

    quiet_py4j()
    return sc
Run Code Online (Sandbox Code Playgroud)

我使用命令执行以下代码:

#first way
pytest spark_context_fixture.py

#second way
python spark_context_fixture.py
Run Code Online (Sandbox Code Playgroud)

输出:

platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0
rootdir: /home/mgr/test, inifile:
collected 0 items
Run Code Online (Sandbox Code Playgroud)

然后我想使用pytest运行wordcount测试。

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
    """ test word couting
    Args:
        spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello …
Run Code Online (Sandbox Code Playgroud)

python pytest apache-spark pyspark

2
推荐指数
1
解决办法
2959
查看次数

Oracle PL/SQL - 不是有效月份错误

我正在尝试使用Oracle站点的这个PL/SQL脚本创建日历表: 日期维度数据生成器

一切都很好,但我不能从第5节开始说明:

SELECT *
FROM TABLE(UDF_CALENDAR_GENERATOR(CAST('1-JAN-2016' AS DATE), CAST('31-DEC-2016' AS DATE)));
Run Code Online (Sandbox Code Playgroud)

我收到错误:

  1. 00000 - "不是有效月份"

*原因:
*行动:

我尝试使用to_date函数(基于这个答案):

SELECT *
FROM TABLE(UDF_CALENDAR_GENERATOR(TO_DATE('14-Apr-2015', 'DD-MON-YYYY'), TO_DATE('14-May-2015', 'DD-MON-YYYY')));
Run Code Online (Sandbox Code Playgroud)

但它不起作用.这个陈述有什么问题?

oracle plsql

1
推荐指数
2
解决办法
292
查看次数