如何使用 Apache Beam Direct 运行程序通过 GOOGLE_APPLICATION_CREDENTIALS 进行身份验证

Ros*_*ndo 2 google-cloud-dataflow apache-beam

如何使用 Apache Beam Direct 运行程序通过 GOOGLE_APPLICATION_CREDENTIALS 进行身份验证?我不想使用 gcloud 帐户进行身份验证。我有一个服务帐户(json),我将其设置为系统变量。如何让 Apache Beam 程序(作为 DirectRunner 运行)使用 GOOGLE_APPLICATION_CREDENTIALS 进行身份验证?

我的用例是访问 Apache Beam 程序中的 GCP Pub/Sub 资源,因此需要进行身份验证

mis*_*ope 5

前言

\n\n

截至今天,在当前版本的 Python SDK 2.9.0 中,Google 云 PubSub 功能仍在积极开发中,目前仅设计用于流处理用途,因此您无法将它们用作发布到 PubSub 的管道的一部分。

\n\n

您仍然可以安装 Google PubSub 客户端,但由于 Apache Beam 运行时的具体情况,您会偶然发现一些限制和细微差别,例如:

\n\n
    \n
  • 您必须安装google-cloud-pubsub并指定 Apache Beam 的依赖项
  • \n
  • 克服 Apache Beam 管道已安装的 PubSub 客户端的序列化不兼容性
  • \n
  • 以某种方式显式地为 PubSub 客户端提供身份验证凭据
  • \n
\n\n

以下是获取能够将已处理集合中的元素发布到 Google 云 PubSub 服务的管道的基本演练。

\n\n

让我们假设一个基本结构:

\n\n
\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 my_stuff\n\xe2\x94\x82   \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 my_package.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 .gitignore\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 main.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 README.md\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 setup.py\n
Run Code Online (Sandbox Code Playgroud)\n\n

获取 PubSub 客户端

\n\n

安装google-cloud-pubsub(假设 pip: pip install google-cloud-pubsub),现在您将遇到提供依赖项的障碍,我建议遵循文档的最后一部分并提供setup.py一些元数据和您的包依赖项:

\n\n
from setuptools import find_packages, setup\n\nsetup(\n    name="my_stuff",\n    version="0.1.0",\n    description="My Pipeline for DirectRunner that publishes to Google Cloud PubSub",\n    long_description=open("README.md").read(),\n    classifiers=[\n        "Programming Language :: Python",\n    ],\n    author="John Doe",\n    author_email="john.doe@example.com",\n    url="https://example.com/",\n    license="proprietary",\n    packages=find_packages(),\n    include_package_data=True,\n    zip_safe=True,\n    install_requires=[\'google-cloud-pubsub==0.35.4\'],\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以使用pip freeze | grep google-cloud-pubsub来获取确切安装的版本。

\n\n

使 PubSub 客户端可序列化

\n\n

如果您只是尝试使用具有 PubSub 客户端实例的发布函数,那么您会从 Apache Beam 收到一个奇怪的错误,指出它无法反序列化它。\n为了克服这个问题,您可以创建一个可调用类并按照pickle 文档Map实现一些方法克服序列化问题。

\n\n

以下是 PubSub 发布者实例化客户端的基本示例:

\n\n
class Publisher(object):\n    """\n    PubSub publisher for the beam pipeline.\n    """\n\n    @staticmethod\n    def init_client():\n        return pubsub_v1.PublisherClient(credentials=\'TODO: Get credentials\')\n\n    def __init__(self, topic):\n        self.topic = topic\n        self.client = self.init_client()\n\n    def __getstate__(self):\n        return self.topic\n\n    def __setstate__(self, topic):\n        self.topic = topic\n        self.client = self.init_client()\n\n    def __call__(self, item, *args, **kwargs):\n        self.client.publish(self.topic, b\'{}\'.format(item))\n
Run Code Online (Sandbox Code Playgroud)\n\n

指定凭证

\n\n

从问题中尚不清楚是否需要重用管道运行时的凭据或需要单独指定。\n指定凭据的方法很少。您可以使用构建它们service_account.Credentials或使用运行时重用凭据GoogleCredentials

\n\n

硬编码凭证

\n\n
from google.cloud import pubsub_v1\nfrom google.oauth2 import service_account\n\nclient1 = pubsub_v1.PublisherClient(\n    credentials=service_account.Credentials.from_service_account_info({\n        "type": "service_account",\n        "project_id": "****",\n        "private_key_id": "****",\n        "private_key": "-----BEGIN PRIVATE KEY-----\\n****\\n-----END PRIVATE KEY-----\\n",\n        "client_email": "****",\n        "client_id": "****",\n        "auth_uri": "https://accounts.google.com/o/oauth2/auth",\n        "token_uri": "https://oauth2.googleapis.com/token",\n        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",\n        "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/***"\n    })\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

来自 JSON 文件的凭证(通过操作系统环境变量)

\n\n
from google.cloud import pubsub_v1\nfrom google.oauth2 import service_account\nimport os\n\nclient2 = pubsub_v1.PublisherClient(\n    credentials=service_account.Credentials.from_service_account_file(\n        os.environ["GOOGLE_APPLICATION_CREDENTIALS"]\n    )\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

重用运行时凭据(用于管道执行)

\n\n
from google.cloud import pubsub_v1\nfrom oauth2client.client import GoogleCredentials\n\nclient3 = pubsub_v1.PublisherClient(\n    credentials=GoogleCredentials.get_application_default()\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

用法

\n\n

Publisher现在您可以像任何其他转换一样在管道中使用:

\n\n
published = (pipeline | "Publish" >> beam.Map(Publisher("pub/sub/topic")))\n
Run Code Online (Sandbox Code Playgroud)\n\n

只是不要忘记您需要--setup_file /absolute/path/to/setup.py为管道运行程序添加参数。

\n