我目前正在从事ETL Dataflow作业(使用Apache Beam Python SDK),该作业从CloudSQL查询数据(带有psycopg2
和自定义ParDo
)并将其写入BigQuery。我的目标是创建一个数据流模板,该模板可以使用Cron作业从AppEngine开始。
我有一个使用DirectRunner在本地工作的版本。为此,我使用CloudSQL(Postgres)代理客户端,以便可以连接到127.0.0.1上的数据库。
当将DataflowRunner与自定义命令一起使用来在setup.py脚本中启动代理时,该作业将不会执行。它坚持重复此日志消息:
Setting node annotation to enable volume controller attach/detach
我的setup.py的一部分看起来如下:
CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['wget', 'https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64', '-O', 'cloud_sql_proxy'],
['echo', 'Proxy downloaded'],
['chmod', '+x', 'cloud_sql_proxy']]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
logging.info("Running custom commands")
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run …
Run Code Online (Sandbox Code Playgroud) python google-cloud-sql google-cloud-dataflow apache-beam cloud-sql-proxy