我有一份包含 3 项任务的工作 1) 使用 POST 请求获取令牌 2) 获取令牌值并将其存储在变量中 3) 使用步骤 2 中的令牌发出 GET 请求并传递不记名令牌
问题是步骤 3 不起作用,并且我收到 HTTP 错误。我能够在步骤 2 中打印 token 的值并在代码中进行验证
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
token ="mytoken" //defined with some value which will be updated later
get_token = SimpleHttpOperator(
task_id='get_token',
method='POST',
headers={"Authorization": "Basic xxxxxxxxxxxxxxx=="},
endpoint='/token?username=user&password=pass&grant_type=password',
http_conn_id = 'test_http',
trigger_rule="all_done",
xcom_push=True,
dag=dag
)
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='get_token')
print("printing token")
print value …Run Code Online (Sandbox Code Playgroud)