如何通过 Python 使用 GCP Cloud SQL 作为数据流源和/或接收器?

Mar*_*Dam 5 python google-cloud-sql google-cloud-dataflow

是否有任何指南可用于使用 Google Cloud SQL 作为数据流读取源和/或接收器?

Apache Beam Python SDK 2.1.0 文档中,没有一章提到 Google Cloud SQL。但有关于 BigQuery 的文章。

当我阅读教程Performing ETL from a Relational Database into BigQuery时,我看到他们在此过程中使用导出到文件的数据作为源。这意味着中间必须有一个出口步骤,但这并不理想。

具体使用Cloud SQL时有什么需要注意的具体问题吗?对于源和接收器?

Pab*_*blo 4

Beam Python SDK 没有内置转换来从 MySQL/Postgres 数据库读取数据。尽管如此,编写自定义转换来执行此操作应该不会太麻烦。你可以这样做:

with beam.Pipeline() as p:
  query_result_pc = (p 
                     | beam.Create(['select a,b,c from table1'])
                     | beam.ParDo(QueryMySqlFn(host='...', user='...'))
                     | beam.Reshuffle())
Run Code Online (Sandbox Code Playgroud)

要连接到 MySQL,我们将使用 mysql 特定的库 mysql.connector,但您可以使用适用于 Postgres/etc 的库。

您的查询函数是:

import mysql.connector


class QueryMySqlFn(beam.DoFn):

  def __init__(self, **server_configuration):
    self.config = server_configuration

  def start_bundle(self):
      self.mydb = mysql.connector.connect(**self.config)
      self.cursor = mydb.cursor()

  def process(self, query):
    self.cursor.execute(query)
    for result in self.cursor:
      yield result
Run Code Online (Sandbox Code Playgroud)

对于 Postgres,您可以使用psycopg2或任何其他允许您连接到它的库:

import psycopg2

class QueryPostgresFn(beam.DoFn):

  def __init__(self, **server_config):
    self.config = server_config

  def process(self, query):
    con = psycopg2.connect(**self.config)
    cur = con.cursor()

    cur.execute(query)
    return cur.fetchall()
Run Code Online (Sandbox Code Playgroud)

常问问题

  • 为什么你beam.Reshuffle在那里有一个转变?- 因为QueryMySqlFn不并行从数据库读取数据。重新洗牌将确保我们的数据在下游并行化以进行进一步处理。