我刚刚开始使用Airflow,请耐心等待,我要做的是从BashOperator任务中收集返回代码并将其保存到本地变量,然后根据返回代码分支到另一个任务.我的问题是弄清楚如何让BashOperator返回一些东西.以下是我的代码段:
dag = DAG(dag_id='dag_1',
default_args=default_args,
schedule_interval='0 2 * * *',
user_defined_macros=user_def_macros,
dagrun_timeout=timedelta(minutes=60)
)
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
Run Code Online (Sandbox Code Playgroud)
我正在尝试xcom_push但老实说不知道它是如何工作的..这是收集结果的正确方法吗?在日志中,最后一行是:命令退出,返回码为0.
我有一个dag,我们将部署到多个不同的气流实例和我们的airflow.cfg,dags_are_paused_at_creation = True
但对于这个特定的dag,我们希望它打开,而不必通过单击UI手动执行.有没有办法以编程方式进行?
我想构建一些我需要捕获所有叶子任务并为它们添加下游依赖项以在我们的数据库中完成作业的东西.有没有一种简单的方法可以在Airflow中找到DAG的所有叶节点?
谁知道我可以在哪里添加额外的芹菜配置到气流芹菜执行器?例如,我想http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts这个属性,但我如何允许额外的芹菜属性..
我创建了一个已注册的自定义 udf,但是当我尝试选择 custom_udf(10) 时出现以下错误:
Exact implementation of BasicPlatform do not match expected java types
这是我的 udf,我似乎无法弄清楚它有什么问题:public class ScalarUdfs {
private ScalarUdfs() {};
@ScalarFunction("basic_platform")
@SqlType(StandardTypes.VARCHAR)
public static Slice BasicPlatform(@SqlNullable @SqlType(StandardTypes.INTEGER) Integer id) {
final Slice IOS = Slices.utf8Slice("iOS");
final Slice ANDROID = Slices.utf8Slice("Android");
final Slice WEB = Slices.utf8Slice("Web");
final Slice OTHER = Slices.utf8Slice("Other");
final Map<Integer, Slice> PLATFORM_MAP = new HashMap<Integer, Slice>() {{
put(20, IOS);
put(42, ANDROID);
put(100, WEB);
}};
if (id == null || !PLATFORM_MAP.containsKey(id)) {
return OTHER;
}
return PLATFORM_MAP.get(id); …
Run Code Online (Sandbox Code Playgroud) - name: Create database if database does not exist
shell: "createdb -h {{ db_host }} -O analytics {{ database }}"
become: yes
become_user: analytics
when:
database_exists.stdout != "1"
Run Code Online (Sandbox Code Playgroud)
目前,我有一个像上面这样的游戏,但是我想要的是条件条件成为/ become_user属性。我希望这样,当ansible_env.USER ='analytics'设置为no,而当不是'analytics'时将其设置为yes。有人知道这是否可行吗?
我正在尝试创建一个数据集,在其中我根据另一列的值聚合一列。举个例子
id | score | id2 | offensive
---------+-------+-----+-----------
a1 | 1.2 | 1 | false
a2 | 36.0 | 1 | true
a3 | 1.2 | 1 | true
a4 | 36.0 | 1 | false
Run Code Online (Sandbox Code Playgroud)
我想要的是最终创建一个包含两个列表的数组,以按攻击性列和与其相关的 id 进行分组,并按分值对它们进行排序。类似于以下内容:
id |id2 | clean_group | offensive_group
---------+-----+-------------+-----------------
a1 | 1 | [a2, a3] | [a4, a1]
a2 | 1 | [a2, a3] | [a4, a1]
a3 | 1 | [a2, a3] | [a4, a1]
a4 | 1 | [a2, a3] …
Run Code Online (Sandbox Code Playgroud)