小编Ace*_*rey的帖子

Airflow BashOperator收集返回代码

我刚刚开始使用Airflow,请耐心等待,我要做的是从BashOperator任务中收集返回代码并将其保存到本地变量,然后根据返回代码分支到另一个任务.我的问题是弄清楚如何让BashOperator返回一些东西.以下是我的代码段:

dag = DAG(dag_id='dag_1',
      default_args=default_args,
      schedule_interval='0 2 * * *',
      user_defined_macros=user_def_macros,
      dagrun_timeout=timedelta(minutes=60)
      )
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
Run Code Online (Sandbox Code Playgroud)

我正在尝试xcom_push但老实说不知道它是如何工作的..这是收集结果的正确方法吗?在日志中,最后一行是:命令退出,返回码为0.

python airflow

9
推荐指数
2
解决办法
6554
查看次数

气流以编程方式取消暂停?

我有一个dag,我们将部署到多个不同的气流实例和我们的airflow.cfg,dags_are_paused_at_creation = True但对于这个特定的dag,我们希望它打开,而不必通过单击UI手动执行.有没有办法以编程方式进行?

airflow apache-airflow airflow-scheduler

9
推荐指数
3
解决办法
4768
查看次数

获取所有气流叶节点/任务

我想构建一些我需要捕获所有叶子任务并为它们添加下游依赖项以在我们的数据库中完成作业的东西.有没有一种简单的方法可以在Airflow中找到DAG的所有叶节点?

python airflow apache-airflow

5
推荐指数
1
解决办法
589
查看次数

为Airflow添加额外的芹菜配置

谁知道我可以在哪里添加额外的芹菜配置到气流芹菜执行器?例如,我想http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts这个属性,但我如何允许额外的芹菜属性..

celery airflow

4
推荐指数
1
解决办法
2270
查看次数

Presto 自定义 UDF

我创建了一个已注册的自定义 udf,但是当我尝试选择 custom_udf(10) 时出现以下错误: Exact implementation of BasicPlatform do not match expected java types 这是我的 udf,我似乎无法弄清楚它有什么问题:public class ScalarUdfs {

private ScalarUdfs() {};

@ScalarFunction("basic_platform")
@SqlType(StandardTypes.VARCHAR)
public static Slice BasicPlatform(@SqlNullable @SqlType(StandardTypes.INTEGER) Integer id) {
    final Slice IOS = Slices.utf8Slice("iOS");
    final Slice ANDROID = Slices.utf8Slice("Android");
    final Slice WEB = Slices.utf8Slice("Web");
    final Slice OTHER = Slices.utf8Slice("Other");
    final Map<Integer, Slice> PLATFORM_MAP = new HashMap<Integer, Slice>() {{
        put(20, IOS);
        put(42, ANDROID);
        put(100, WEB);
    }};

    if (id == null || !PLATFORM_MAP.containsKey(id)) {
        return OTHER;
    }
    return PLATFORM_MAP.get(id); …
Run Code Online (Sandbox Code Playgroud)

java presto

3
推荐指数
1
解决办法
992
查看次数

成为条件/成为用户的条件

- name: Create database if database does not exist
  shell: "createdb -h {{ db_host }} -O analytics {{ database }}"
  become: yes
  become_user: analytics
  when:
    database_exists.stdout != "1"
Run Code Online (Sandbox Code Playgroud)

目前,我有一个像上面这样的游戏,但是我想要的是条件条件成为/ become_user属性。我希望这样,当ansible_env.USER ='analytics'设置为no,而当不是'analytics'时将其设置为yes。有人知道这是否可行吗?

ansible ansible-2.x

2
推荐指数
1
解决办法
242
查看次数

根据 Presto/Hive 中的列值聚合列

我正在尝试创建一个数据集,在其中我根据另一列的值聚合一列。举个例子

 id      | score | id2 | offensive 
---------+-------+-----+-----------
 a1      | 1.2   | 1   | false     
 a2      | 36.0  | 1   | true      
 a3      | 1.2   | 1   | true      
 a4      | 36.0  | 1   | false  
Run Code Online (Sandbox Code Playgroud)

我想要的是最终创建一个包含两个列表的数组,以按攻击性列和与其相关的 id 进行分组,并按分值对它们进行排序。类似于以下内容:

 id      |id2  | clean_group | offensive_group
---------+-----+-------------+-----------------
 a1      | 1   | [a2, a3]    | [a4, a1]
 a2      | 1   | [a2, a3]    | [a4, a1]
 a3      | 1   | [a2, a3]    | [a4, a1]
 a4      | 1   | [a2, a3] …
Run Code Online (Sandbox Code Playgroud)

hive presto hiveql

1
推荐指数
1
解决办法
4270
查看次数