Mar*_*Dam 5 python google-cloud-sql google-cloud-dataflow
是否有任何指南可用于使用 Google Cloud SQL 作为数据流读取源和/或接收器?
在Apache Beam Python SDK 2.1.0 文档中,没有一章提到 Google Cloud SQL。但有关于 BigQuery 的文章。
当我阅读教程Performing ETL from a Relational Database into BigQuery时,我看到他们在此过程中使用导出到文件的数据作为源。这意味着中间必须有一个出口步骤,但这并不理想。
具体使用Cloud SQL时有什么需要注意的具体问题吗?对于源和接收器?
Beam Python SDK 没有内置转换来从 MySQL/Postgres 数据库读取数据。尽管如此,编写自定义转换来执行此操作应该不会太麻烦。你可以这样做:
with beam.Pipeline() as p:
query_result_pc = (p
| beam.Create(['select a,b,c from table1'])
| beam.ParDo(QueryMySqlFn(host='...', user='...'))
| beam.Reshuffle())
Run Code Online (Sandbox Code Playgroud)
要连接到 MySQL,我们将使用 mysql 特定的库 mysql.connector,但您可以使用适用于 Postgres/etc 的库。
您的查询函数是:
import mysql.connector
class QueryMySqlFn(beam.DoFn):
def __init__(self, **server_configuration):
self.config = server_configuration
def start_bundle(self):
self.mydb = mysql.connector.connect(**self.config)
self.cursor = mydb.cursor()
def process(self, query):
self.cursor.execute(query)
for result in self.cursor:
yield result
Run Code Online (Sandbox Code Playgroud)
对于 Postgres,您可以使用psycopg2
或任何其他允许您连接到它的库:
import psycopg2
class QueryPostgresFn(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
return cur.fetchall()
Run Code Online (Sandbox Code Playgroud)
常问问题
beam.Reshuffle
在那里有一个转变?- 因为QueryMySqlFn
不并行从数据库读取数据。重新洗牌将确保我们的数据在下游并行化以进行进一步处理。