如何在气流中传递不记名令牌

Moh*_*han 7 airflow airflow-scheduler

我有一份包含 3 项任务的工作 1) 使用 POST 请求获取令牌 2) 获取令牌值并将其存储在变量中 3) 使用步骤 2 中的令牌发出 GET 请求并传递不记名令牌

问题是步骤 3 不起作用,并且我收到 HTTP 错误。我能够在步骤 2 中打印 token 的值并在代码中进行验证

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
token ="mytoken" //defined with some value which will be updated later

get_token = SimpleHttpOperator(
        task_id='get_token',
        method='POST',
        headers={"Authorization": "Basic xxxxxxxxxxxxxxx=="},
        endpoint='/token?username=user&password=pass&grant_type=password',
        http_conn_id = 'test_http',
        trigger_rule="all_done",
        xcom_push=True,
        dag=dag
    )

def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='get_token')
    print("printing token")
    print value
    wjdata = json.loads(value)
    print(wjdata['access_token'])
    token=wjdata['access_token']
    print token


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=pull_function,
    dag=dag,
)

get_config = SimpleHttpOperator(
        task_id='get_config',
        method='GET',
        headers={"Authorization": "Bearer " + token},
        endpoint='someendpoint',
        http_conn_id = 'test_conn',
        trigger_rule="all_done",
        xcom_push=True,
        dag=dag
    )

get_token >> run_this >> get_config
Run Code Online (Sandbox Code Playgroud)

bos*_*jak 5

您存储token为“全局”变量的方式将不起作用。Dag 定义文件(定义任务的脚本)与执行每个任务的运行时上下文不同。每个任务都可以在单独的线程、进程甚至另一台机器上运行,具体取决于执行器。在任务之间传递数据的方式不是通过全局变量,而是使用 XCom - 您已经部分这样做了。尝试以下操作: - 远程全局token变量 - inpull_function而不是print tokendo return token- 这将再次将值推送到 XCom,以便下一个任务可以访问它 - 在下一个任务中从 XCom 访问令牌。

最后一步有点棘手,因为您使用的是SimpleHttpOperator,并且模板化字段只有endpointdata,但不是headers。例如,如果您想data从前一个任务的 XCom 中传递一些内容,您可以执行如下操作:

get_config = SimpleHttpOperator(
        task_id='get_config',
        endpoint='someendpoint',
        http_conn_id = 'test_conn',
        dag=dag,
        data='{{ task_instance.xcom_pull(task_ids="print_the_context", key="some_key") }}'
    )
Run Code Online (Sandbox Code Playgroud)

但不幸的是,您不能对标头执行相同的操作,因此您必须通过 PythonOperator“手动”执行此操作,或者您可以继承SimpleHttpOperator并创建自己的标头,例如:

class HeaderTemplatedHttpOperator(SimpleHttpOperator):
    template_fields = ('endpoint', 'data', 'headers')  # added 'headers' headers
Run Code Online (Sandbox Code Playgroud)

然后使用那个,比如:

get_config = HeaderTemplatedHttpOperator(
        task_id='get_config',
        endpoint='someendpoint',
        http_conn_id = 'test_conn',
        dag=dag,
        headers='{{ task_instance.xcom_pull(task_ids="print_the_context") }}'
    )
Run Code Online (Sandbox Code Playgroud)

请记住,我没有对此进行测试,这只是为了解释这个概念。尝试一下这个方法,你应该就能到达那里。