GCP Composer v1.18.6 和 2.0.10 与 CloudSqlProxyRunner 不兼容

not*_*ami 5 google-cloud-sql airflow cloud-sql-proxy google-cloud-composer

在我的Composer Airflow DAG 中,我一直在使用 CloudSqlProxyRunner连接到我的 Cloud SQL 实例。

然而,在将 Google Cloud Composer 从 v1.18.4 更新到 1.18.6 后,我的 DAG 开始遇到一个奇怪的错误:

[2022-04-22, 23:20:18 UTC] {cloud_sql.py:462} INFO - Downloading cloud_sql_proxy from https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64 to /home/airflow/dXhOYoU_cloud_sql_proxy.tmp
[2022-04-22, 23:20:18 UTC] {taskinstance.py:1702} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1457, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1513, in _execute_task
    result = execute_callable(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 134, in execute
    return_value = super().execute(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
    return_value = self.execute_callable()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 185, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/real_time_scoring_pipeline.py", line 99, in get_messages_db
    with SQLConnection() as sql_conn:
  File "/home/airflow/gcs/dags/helpers/helpers.py", line 71, in __enter__
    self.proxy_runner.start_proxy()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 524, in start_proxy
    self._download_sql_proxy_if_needed()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 474, in _download_sql_proxy_if_needed
    raise AirflowException(
airflow.exceptions.AirflowException: The cloud-sql-proxy could not be downloaded. Status code = 404. Reason = Not Found
Run Code Online (Sandbox Code Playgroud)

手动检查,https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64确实返回404。

查看引发异常的函数_download_sql_proxy_if_needed,它具有以下代码:

[2022-04-22, 23:20:18 UTC] {cloud_sql.py:462} INFO - Downloading cloud_sql_proxy from https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64 to /home/airflow/dXhOYoU_cloud_sql_proxy.tmp
[2022-04-22, 23:20:18 UTC] {taskinstance.py:1702} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1457, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1513, in _execute_task
    result = execute_callable(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 134, in execute
    return_value = super().execute(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
    return_value = self.execute_callable()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 185, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/real_time_scoring_pipeline.py", line 99, in get_messages_db
    with SQLConnection() as sql_conn:
  File "/home/airflow/gcs/dags/helpers/helpers.py", line 71, in __enter__
    self.proxy_runner.start_proxy()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 524, in start_proxy
    self._download_sql_proxy_if_needed()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_sql.py", line 474, in _download_sql_proxy_if_needed
    raise AirflowException(
airflow.exceptions.AirflowException: The cloud-sql-proxy could not be downloaded. Status code = 404. Reason = Not Found
Run Code Online (Sandbox Code Playgroud)

因此,无论出于何种原因,在 Composer 的这两个最新图像中,processor = os.uname().machine都返回x86_64。以前,它返回amd64,并且https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64实际上是我们需要的二进制文件的有效链接。

我也在 Composer 2.0.10 中复制了这个错误。

我仍在研究可能的解决方法,但将其发布在此处,以防其他人遇到此问题,并找到了解决方法,并向 Google 工程师提出此问题(根据 Composer 的文档,他们监控此标签)。

not*_*ami 3

我当前的解决方法是修补 CloudSqlProxyRunner 以硬编码正确的 URL:

class PatchedCloudSqlProxyRunner(CloudSqlProxyRunner):
    """
    This is a patched version of CloudSqlProxyRunner to provide a workaround for an incorrectly
    generated URL to the Cloud SQL proxy binary.
    """

    def _download_sql_proxy_if_needed(self) -> None:
        download_url = "https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64"
        
        # the rest of the code is taken from the original method

        proxy_path_tmp = self.sql_proxy_path + ".tmp"
        self.log.info(
            "Downloading cloud_sql_proxy from %s to %s", download_url, proxy_path_tmp
        )
        # httpx has a breaking API change (follow_redirects vs allow_redirects)
        # and this should work with both versions (cf. issue #20088)
        if "follow_redirects" in signature(httpx.get).parameters.keys():
            response = httpx.get(download_url, follow_redirects=True)
        else:
            response = httpx.get(download_url, allow_redirects=True)  # type: ignore[call-arg]
        # Downloading to .tmp file first to avoid case where partially downloaded
        # binary is used by parallel operator which uses the same fixed binary path
        with open(proxy_path_tmp, "wb") as file:
            file.write(response.content)
        if response.status_code != 200:
            raise AirflowException(
                "The cloud-sql-proxy could not be downloaded. "
                f"Status code = {response.status_code}. Reason = {response.reason_phrase}"
            )

        self.log.info(
            "Moving sql_proxy binary from %s to %s", proxy_path_tmp, self.sql_proxy_path
        )
        shutil.move(proxy_path_tmp, self.sql_proxy_path)
        os.chmod(self.sql_proxy_path, 0o744)  # Set executable bit
        self.sql_proxy_was_downloaded = True
Run Code Online (Sandbox Code Playgroud)

然后实例化它并像使用原始 CloudSqlProxyRunner 一样使用它:

proxy_runner = PatchedCloudSqlProxyRunner(path_prefix, instance_spec)
proxy_runner.start_proxy()
Run Code Online (Sandbox Code Playgroud)

但我希望 Google 的某个人很快能够通过修复 os.uname().machine 值或将 Cloud SQL 代理二进制文件上传到当前在_download_sql_proxy_if_needed.