Sne*_*ngh 5 python google-cloud-platform airflow google-cloud-functions google-cloud-composer
对于一个要求,我想从云作曲家管道内部调用/调用云函数,但我找不到太多相关信息,我尝试使用 SimpleHTTP 气流运算符,但收到此错误:
[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
Run Code Online (Sandbox Code Playgroud)
提前致谢!!
我遇到了和你一样的问题,但我通过研究 Google 的 Airflow 2.0 提供程序包并使用 PythonOperator 设法解决了这个问题。
from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession
def invoke_cloud_function():
url = "<your_cloud_function_url>" #the url is also the target audience.
request = google.auth.transport.requests.Request() #this is a request for obtaining the the credentials
id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience
resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function
print(resp.status_code) # should return 200
print(resp.content) # the body of the HTTP response
Run Code Online (Sandbox Code Playgroud)
因此,调用该函数如下:
task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)
Run Code Online (Sandbox Code Playgroud)
据我了解,访问经过身份验证的 HTTP Cloud Function 严格需要基于 ID Token 的凭证。因此,要获取所需类型的凭据,get_default_id_token_credentials()请执行应用程序默认凭据 (ADC) 授权流程,该流程是从环境变量、已知位置获取凭据的过程。或 Compute Engine 元数据服务器。Composer 应该通过环境变量提供关联的服务帐户密钥文件(可能GOOGLE_APPLICATION_CREDENTIALS)。
拥有正确类型的凭据后,您可以使用 AuthorizedSessions 对象来验证您对云功能的请求。
| 归档时间: |
|
| 查看次数: |
5257 次 |
| 最近记录: |