not*_*ami 5 google-cloud-sql airflow cloud-sql-proxy google-cloud-composer
在我的Composer Airflow DAG 中,我一直在使用 CloudSqlProxyRunner连接到我的 Cloud SQL 实例。
然而,在将 Google Cloud Composer 从 v1.18.4 更新到 1.18.6 后,我的 DAG 开始遇到一个奇怪的错误:
[2022-04-22, 23:20:18 UTC] {cloud_sql.py:462} INFO - Downloading cloud_sql_proxy from https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64 to /home/airflow/dXhOYoU_cloud_sql_proxy.tmp
[2022-04-22, 23:20:18 UTC] {taskinstance.py:1702} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1457, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1513, in _execute_task
result = execute_callable(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 134, in execute
return_value = super().execute(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 185, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/real_time_scoring_pipeline.py", line 99, in get_messages_db
with SQLConnection() as sql_conn:
File "/home/airflow/gcs/dags/helpers/helpers.py", line 71, in __enter__
self.proxy_runner.start_proxy()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 524, in start_proxy
self._download_sql_proxy_if_needed()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 474, in _download_sql_proxy_if_needed
raise AirflowException(
airflow.exceptions.AirflowException: The cloud-sql-proxy could not be downloaded. Status code = 404. Reason = Not Found
Run Code Online (Sandbox Code Playgroud)
手动检查,https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64确实返回404。
查看引发异常的函数_download_sql_proxy_if_needed,它具有以下代码:
[2022-04-22, 23:20:18 UTC] {cloud_sql.py:462} INFO - Downloading cloud_sql_proxy from https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64 to /home/airflow/dXhOYoU_cloud_sql_proxy.tmp
[2022-04-22, 23:20:18 UTC] {taskinstance.py:1702} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1457, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1513, in _execute_task
result = execute_callable(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 134, in execute
return_value = super().execute(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 185, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/real_time_scoring_pipeline.py", line 99, in get_messages_db
with SQLConnection() as sql_conn:
File "/home/airflow/gcs/dags/helpers/helpers.py", line 71, in __enter__
self.proxy_runner.start_proxy()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 524, in start_proxy
self._download_sql_proxy_if_needed()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 474, in _download_sql_proxy_if_needed
raise AirflowException(
airflow.exceptions.AirflowException: The cloud-sql-proxy could not be downloaded. Status code = 404. Reason = Not Found
Run Code Online (Sandbox Code Playgroud)
因此,无论出于何种原因,在 Composer 的这两个最新图像中,processor = os.uname().machine
都返回x86_64
。以前,它返回amd64
,并且https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64实际上是我们需要的二进制文件的有效链接。
我也在 Composer 2.0.10 中复制了这个错误。
我仍在研究可能的解决方法,但将其发布在此处,以防其他人遇到此问题,并找到了解决方法,并向 Google 工程师提出此问题(根据 Composer 的文档,他们监控此标签)。
我当前的解决方法是修补 CloudSqlProxyRunner 以硬编码正确的 URL:
class PatchedCloudSqlProxyRunner(CloudSqlProxyRunner):
"""
This is a patched version of CloudSqlProxyRunner to provide a workaround for an incorrectly
generated URL to the Cloud SQL proxy binary.
"""
def _download_sql_proxy_if_needed(self) -> None:
download_url = "https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64"
# the rest of the code is taken from the original method
proxy_path_tmp = self.sql_proxy_path + ".tmp"
self.log.info(
"Downloading cloud_sql_proxy from %s to %s", download_url, proxy_path_tmp
)
# httpx has a breaking API change (follow_redirects vs allow_redirects)
# and this should work with both versions (cf. issue #20088)
if "follow_redirects" in signature(httpx.get).parameters.keys():
response = httpx.get(download_url, follow_redirects=True)
else:
response = httpx.get(download_url, allow_redirects=True) # type: ignore[call-arg]
# Downloading to .tmp file first to avoid case where partially downloaded
# binary is used by parallel operator which uses the same fixed binary path
with open(proxy_path_tmp, "wb") as file:
file.write(response.content)
if response.status_code != 200:
raise AirflowException(
"The cloud-sql-proxy could not be downloaded. "
f"Status code = {response.status_code}. Reason = {response.reason_phrase}"
)
self.log.info(
"Moving sql_proxy binary from %s to %s", proxy_path_tmp, self.sql_proxy_path
)
shutil.move(proxy_path_tmp, self.sql_proxy_path)
os.chmod(self.sql_proxy_path, 0o744) # Set executable bit
self.sql_proxy_was_downloaded = True
Run Code Online (Sandbox Code Playgroud)
然后实例化它并像使用原始 CloudSqlProxyRunner 一样使用它:
proxy_runner = PatchedCloudSqlProxyRunner(path_prefix, instance_spec)
proxy_runner.start_proxy()
Run Code Online (Sandbox Code Playgroud)
但我希望 Google 的某个人很快能够通过修复 os.uname().machine 值或将 Cloud SQL 代理二进制文件上传到当前在_download_sql_proxy_if_needed
.
归档时间: |
|
查看次数: |
478 次 |
最近记录: |