Cloud Storage 存储分区的 Cloud SQL 导入权限问题

Oli*_*nce 5 python google-cloud-storage google-cloud-sql google-cloud-platform google-cloud-functions

我正在编写一个云函数来:

  • Cloud SQL (postgresql) 数据库导出到 Cloud Storage 存储分区中的文件
  • 其重新导入另一个 Cloud SQL 实例/数据库(仍然是 postgresql)

注意: 我希望这段代码每晚自行运行,以将生产数据库复制到暂存环境,因此我计划使用 Cloud Scheduler 触发它。
如果您有更好/更简单的解决方案可以在 GCP 中解决这个问题,我会全力以赴:)

这是我的代码(实际功能clone_db在文件底部):

from os import getenv
from datetime import datetime
from time import sleep

from googleapiclient import discovery
from googleapiclient.errors import HttpError
from oauth2client.client import GoogleCredentials
from google.cloud import storage

GS_BUCKET = getenv("GS_BUCKET")
GS_FOLDER = "sql-exports"
GS_EXPORT_PATH = f"gs://{GS_BUCKET}/{GS_FOLDER}"


def __sql_file_name(db: str, timestamp: datetime):
    return f"{db}-{timestamp.strftime('%Y-%m-%d')}.sql.gz"


def __sql_file_uri(db: str, timestamp: datetime):
    return f"{GS_EXPORT_PATH}/{__sql_file_name(db, timestamp)}"


def __export_source_db(service, project: str, timestamp: datetime, instance: str, db: str):
    context = {
        "exportContext": {
            "kind": "sql#exportContext",
            "fileType": "SQL",
            "uri": __sql_file_uri(db, timestamp),
            "databases": [db],
        }
    }

    return service.instances().export(project=project, instance=instance, body=context).execute()


def __import_target_db(service, project: str, timestamp: datetime, instance: str, db: str):
    context = {
        "importContext": {
            "kind": "sql#importContext",
            "fileType": "SQL",
            "uri": __sql_file_uri(db, timestamp),
            "database": db,
        }
    }

    return service.instances().import_(project=project, instance=instance, body=context).execute()


def __drop_db(service, project: str, instance: str, db: str):
    try:
        return service.databases().delete(project=project, instance=instance, database=db).execute()
    except HttpError as e:
        if e.resp.status == 404:
            return {"status": "DONE"}
        else:
            raise e


def __create_db(service, project: str, instance: str, db: str):
    database = {
        "name": db,
        "project": project,
        "instance": instance,
    }

    return service.databases().insert(project=project, instance=instance, body=database).execute()


def __update_export_permissions(file_name: str):
    client = storage.Client()
    file = client.get_bucket(GS_BUCKET).get_blob(f"{GS_FOLDER}/{file_name}")
    file.acl.user(getenv("TARGET_DB_SERVICE_ACCOUNT")).grant_read()
    file.acl.save()


def __delete_sql_file(file_name: str):
    client = storage.Client()
    bucket = client.get_bucket(GS_BUCKET)
    bucket.delete_blob(f"{GS_FOLDER}/{file_name}")


def __wait_for(operation_type, operation, service, project):
    if operation["status"] in ("PENDING", "RUNNING", "UNKNOWN"):
        print(f"{operation_type} operation in {operation['status']} status. Waiting for completion...")

        while operation['status'] != "DONE":
            sleep(1)
            operation = service.operations().get(project=project, operation=operation['name']).execute()

    print(f"{operation_type} operation completed!")


def clone_db(_):
    credentials = GoogleCredentials.get_application_default()
    service = discovery.build('sqladmin', 'v1beta4', credentials=credentials)

    # Project ID of the project that contains the instance to be exported.
    project = getenv('PROJECT_ID')

    # Cloud SQL instance ID. This does not include the project ID.
    source = {
        "instance": getenv("SOURCE_INSTANCE_ID"),
        "db": getenv("SOURCE_DB_NAME")
    }

    timestamp = datetime.utcnow()

    print(f"Exporting database {source['instance']}:{source['db']} to Cloud Storage...")
    operation = __export_source_db(service, project, timestamp, **source)

    __wait_for("Export", operation, service, project)

    print("Updating exported file permissions...")
    __update_export_permissions(__sql_file_name(source["db"], timestamp))
    print("Done.")

    target = {
        "instance": getenv("TARGET_INSTANCE_ID"),
        "db": getenv("TARGET_DB_NAME")
    }

    print(f"Dropping target database {target['instance']}:{target['db']}")
    operation = __drop_db(service, project, **target)
    __wait_for("Drop", operation, service, project)

    print(f"Creating database {target['instance']}:{target['db']}...")
    operation = __create_db(service, project, **target)
    __wait_for("Creation", operation, service, project)

    print(f"Importing data into {target['instance']}:{target['db']}...")
    operation = __import_target_db(service, project, timestamp, **target)
    __wait_for("Import", operation, service, project)

    print("Deleting exported SQL file")
    __delete_sql_file(__sql_file_name(source["db"], timestamp))
    print("Done.")

Run Code Online (Sandbox Code Playgroud)

一切正常,直到我尝试导入导出数据到我的目标实例。

当它调用 时import_,该函数失败并出现以下错误:

Error: function crashed. Details:
<HttpError 403 when requesting https://www.googleapis.com/sql/v1beta4/projects/<project_id>/instances/<instance_id>/import?alt=json returned "The service account does not have the required permissions for the bucket.">

Run Code Online (Sandbox Code Playgroud)

我已经在此处和网上的许多其他问答中阅读了有关此错误的信息,但我不知道如何使事情正常进行。
这是我所做的:

  • Cloud Function 作为我的“Compute Engine 默认服务帐户”运行,它具有 Project Editor在 IAM 中设置角色
  • 目标 Cloud SQL 实例的服务帐号作为Storage Object Admin. 我尝试了各种其他角色组合(传统阅读器/所有者、存储对象查看器等),但无济于事
  • 正如您在函数代码中看到的那样,我专门授予对导出文件的目标实例服务帐户的读取访问权限,并且正确反映在云存储中对象的权限上:

GCS 对象权限

  • 我已尝试禁用此存储桶的对象级权限,并确保正确设置了上述第一点的权限,但它也不起作用

有趣的是,当我尝试从 GCP Cloud SQL 控制台手动导入同一实例上的同一文件时,一切正常。
完成后,我可以看到我导出的文件的权限已更新为包含实例的服务帐户作为Reader,就像我最后在代码中尝试重现该行为一样。

那么我在这里错过了什么?
我应该为哪个服务帐户设置哪些权限才能使其工作?