如何从 Google Cloud Composer 调用云函数?

Sne*_*ngh 5 python google-cloud-platform airflow google-cloud-functions google-cloud-composer

对于一个要求,我想从云作曲家管道内部调用/调用云函数,但我找不到太多相关信息,我尝试使用 SimpleHTTP 气流运算符,但收到此错误:

[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in 
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in 
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in 
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
Run Code Online (Sandbox Code Playgroud)

提前致谢!!

Sen*_*ong 6

我遇到了和你一样的问题,但我通过研究 Google 的 Airflow 2.0 提供程序包并使用 PythonOperator 设法解决了这个问题。

from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession

def invoke_cloud_function():

  url = "<your_cloud_function_url>" #the url is also the target audience. 
  request = google.auth.transport.requests.Request()  #this is a request for obtaining the the credentials
  id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience 

  resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function

  print(resp.status_code) # should return 200
  print(resp.content) # the body of the HTTP response

Run Code Online (Sandbox Code Playgroud)

因此,调用该函数如下:

    task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)
Run Code Online (Sandbox Code Playgroud)

据我了解,访问经过身份验证的 HTTP Cloud Function 严格需要基于 ID Token 的凭证。因此,要获取所需类型的凭据,get_default_id_token_credentials()请执行应用程序默认凭据 (ADC) 授权流程,该流程是从环境变量、已知位置获取凭据的过程。或 Compute Engine 元数据服务器。Composer 应该通过环境变量提供关联的服务帐户密钥文件(可能GOOGLE_APPLICATION_CREDENTIALS)。

拥有正确类型的凭据后,您可以使用 AuthorizedSessions 对象来验证您对云功能的请求。