Ros*_*ndo 2 google-cloud-dataflow apache-beam
如何使用 Apache Beam Direct 运行程序通过 GOOGLE_APPLICATION_CREDENTIALS 进行身份验证?我不想使用 gcloud 帐户进行身份验证。我有一个服务帐户(json),我将其设置为系统变量。如何让 Apache Beam 程序(作为 DirectRunner 运行)使用 GOOGLE_APPLICATION_CREDENTIALS 进行身份验证?
我的用例是访问 Apache Beam 程序中的 GCP Pub/Sub 资源,因此需要进行身份验证
截至今天,在当前版本的 Python SDK 2.9.0 中,Google 云 PubSub 功能仍在积极开发中,目前仅设计用于流处理用途,因此您无法将它们用作发布到 PubSub 的管道的一部分。
\n\n您仍然可以安装 Google PubSub 客户端,但由于 Apache Beam 运行时的具体情况,您会偶然发现一些限制和细微差别,例如:
\n\ngoogle-cloud-pubsub并指定 Apache Beam 的依赖项以下是获取能够将已处理集合中的元素发布到 Google 云 PubSub 服务的管道的基本演练。
\n\n让我们假设一个基本结构:
\n\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 my_stuff\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 my_package.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 .gitignore\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 main.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 README.md\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 setup.py\nRun Code Online (Sandbox Code Playgroud)\n\n安装google-cloud-pubsub(假设 pip: pip install google-cloud-pubsub),现在您将遇到提供依赖项的障碍,我建议遵循文档的最后一部分并提供setup.py一些元数据和您的包依赖项:
from setuptools import find_packages, setup\n\nsetup(\n name="my_stuff",\n version="0.1.0",\n description="My Pipeline for DirectRunner that publishes to Google Cloud PubSub",\n long_description=open("README.md").read(),\n classifiers=[\n "Programming Language :: Python",\n ],\n author="John Doe",\n author_email="john.doe@example.com",\n url="https://example.com/",\n license="proprietary",\n packages=find_packages(),\n include_package_data=True,\n zip_safe=True,\n install_requires=[\'google-cloud-pubsub==0.35.4\'],\n)\nRun Code Online (Sandbox Code Playgroud)\n\n您可以使用pip freeze | grep google-cloud-pubsub来获取确切安装的版本。
如果您只是尝试使用具有 PubSub 客户端实例的发布函数,那么您会从 Apache Beam 收到一个奇怪的错误,指出它无法反序列化它。\n为了克服这个问题,您可以创建一个可调用类并按照pickle 文档Map实现一些方法克服序列化问题。
以下是 PubSub 发布者实例化客户端的基本示例:
\n\nclass Publisher(object):\n """\n PubSub publisher for the beam pipeline.\n """\n\n @staticmethod\n def init_client():\n return pubsub_v1.PublisherClient(credentials=\'TODO: Get credentials\')\n\n def __init__(self, topic):\n self.topic = topic\n self.client = self.init_client()\n\n def __getstate__(self):\n return self.topic\n\n def __setstate__(self, topic):\n self.topic = topic\n self.client = self.init_client()\n\n def __call__(self, item, *args, **kwargs):\n self.client.publish(self.topic, b\'{}\'.format(item))\nRun Code Online (Sandbox Code Playgroud)\n\n从问题中尚不清楚是否需要重用管道运行时的凭据或需要单独指定。\n指定凭据的方法很少。您可以使用构建它们service_account.Credentials或使用运行时重用凭据GoogleCredentials。
from google.cloud import pubsub_v1\nfrom google.oauth2 import service_account\n\nclient1 = pubsub_v1.PublisherClient(\n credentials=service_account.Credentials.from_service_account_info({\n "type": "service_account",\n "project_id": "****",\n "private_key_id": "****",\n "private_key": "-----BEGIN PRIVATE KEY-----\\n****\\n-----END PRIVATE KEY-----\\n",\n "client_email": "****",\n "client_id": "****",\n "auth_uri": "https://accounts.google.com/o/oauth2/auth",\n "token_uri": "https://oauth2.googleapis.com/token",\n "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",\n "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/***"\n })\n)\nRun Code Online (Sandbox Code Playgroud)\n\nfrom google.cloud import pubsub_v1\nfrom google.oauth2 import service_account\nimport os\n\nclient2 = pubsub_v1.PublisherClient(\n credentials=service_account.Credentials.from_service_account_file(\n os.environ["GOOGLE_APPLICATION_CREDENTIALS"]\n )\n)\nRun Code Online (Sandbox Code Playgroud)\n\nfrom google.cloud import pubsub_v1\nfrom oauth2client.client import GoogleCredentials\n\nclient3 = pubsub_v1.PublisherClient(\n credentials=GoogleCredentials.get_application_default()\n)\nRun Code Online (Sandbox Code Playgroud)\n\nPublisher现在您可以像任何其他转换一样在管道中使用:
published = (pipeline | "Publish" >> beam.Map(Publisher("pub/sub/topic")))\nRun Code Online (Sandbox Code Playgroud)\n\n只是不要忘记您需要--setup_file /absolute/path/to/setup.py为管道运行程序添加参数。
| 归档时间: |
|
| 查看次数: |
4257 次 |
| 最近记录: |