从 Beam 管道连接 google cloud sql postgres 实例

San*_*esh 4 python postgresql google-cloud-sql apache-beam

我想从在谷歌数据流上运行的 apache beam 管道连接谷歌云 sql postgres 实例。
我想使用 Python SDK 来完成此操作。
我无法为此找到适当的文档。
在云SQL如何指导我没有看到任何数据流文档。
https://cloud.google.com/sql/docs/postgres/

有人可以提供文档链接/github 示例吗?

moh*_*eeb 6

您可以使用来自beam-nuggets的relational_db.Writerelational_db.Read转换,如下所示:

首先安装beam-nuggests:

pip install beam-nuggets
Run Code Online (Sandbox Code Playgroud)

供阅读:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name='months',
    )
    records | 'Writing to stdout' >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)

对于写作:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    months = p | "Reading month records" >> beam.Create([
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
    ])
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
        create_if_missing=True,
    )
    table_config = relational_db.TableConfiguration(
        name='months',
        create_if_missing=True
    )
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )
Run Code Online (Sandbox Code Playgroud)

  • @chmod_007 我不知道类似的功能。但是,您可以从管道连接到数据库,而无需“使用 cloud_sql_proxy”或“将数据库的工作线程 IP 列入白名单”,方法是在 Cloud SQL DB 上启用“私有 IP”连接,并确保 Cloud SQL DB 和数据流工作人员位于同一个谷歌云区域(并使用其私有IP连接到数据库)。有关更多信息,请查看 https://cloud.google.com/sql/docs/postgres/private-ip。 (2认同)