气流在执行操作员后得到结果

Alg*_*g_D 3 python hive airflow

我已经配置了气流并创建了一些调用几个运算符的Dags和subDag.

我的麻烦是当操作员运行并完成工作时,我希望以某种python结构返回结果.例如:

File1.py

  ...
    ...
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
       ),
        task_id=DELP_DAG_NAME,
        dag=dag,
    )
Run Code Online (Sandbox Code Playgroud)

File2.py

  from airflow import DAG
    from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
                         user_defined_macros, path
                        ):
        dag_subdag = DAG(
            dag_id='%s.%s' % (parent, child),
            default_args=args,
            schedule_interval="@daily",
            template_searchpath=path,
            user_defined_macros=user_defined_macros,
        )

        # some work...

        HiveOperator(
            task_id='some_id',
            hiveconf_jinja_translate=True,
            hql='select field1 from public.mytable limit 4;',
            trigger_rule='all_done',
            dag=dag_subdag,
        )

        return dag_subdag 
Run Code Online (Sandbox Code Playgroud)

函数subdag_callHive是从另一个python脚本调用的,其中定义了主Dag并且需要所有其他参数.

我只是需要能够从HiveOperator(*select*from public.mytable limit 4;*)获得结果,在这种情况下将是4个值.

返回的dag_subdag是一个对象<class'airflow.models.DAG'>并包含传递给调用的所有属性/数据,但没有关于HiveOperator所做的事情的信息.

这可能吗?如果是这样,它怎么能完成.

小智 7

您可以根据需要使用挂钩.基本上HiveOperator也是如此,他称Hive Hooks有多种方法可以处理结果.

使用PythonOperator调用一个函数,然后启动一个hive钩子.

以下示例可能对您有帮助.

代码片段:

callHook = PythonOperator(
    task_id='foo',
    python_callable=do_work,
    dag=dag
)

def do_work():
    hiveserver = HiveServer2Hook()
    hql = "SELECT COUNT(*) FROM foo.bar"
    row_count = hiveserver.get_records(hql, schema='foo')
    print row_count[0][0]
Run Code Online (Sandbox Code Playgroud)

所有可用的方法都可以在这里找到:https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py