如何从 Postgres RDB 到 Databricks Lakehouse Delta Lake?

LeO*_*low 6 apache-spark pyspark databricks delta-lake delta-live-tables

到底如何创建一个高效且可重用的 Databricks 工作流程来将原始 SQL 数据库转储到 Delta Lake 中。这里的一些混淆是为了实现以下目的的最佳方法:

  • 处理模式中的偏差(数据库表中的列)=> 对存储的表进行简单的覆盖可以吗?
  • 捕获数据变化(CDC)并高效合并现有数据;身份证上说。这对于关系数据库是否仍然相关?
  • Delta Live Table (DLT) 格式适合这个吗?

人们可能会想象以下过程:

  1. 迭代公共表 information_schema:
table_names = spark.read.jdbc(url=jdbcUrl, table="information_schema.tables",
                               properties=connectionProperties) \
                               .filter("table_schema = 'public'") \
                               .select("table_name") \
                               .rdd.flatMap(lambda x: x) \
                               .collect()

for table in table_names:
    ...
Run Code Online (Sandbox Code Playgroud)
  1. 然后对于每个表:
  • (A) 创建一个新的 Delta Lake 表,如果它不存在(或者可能在架构方面已经过时),否则;
  • (B) 将新数据/更新数据合并到 Delta Lake 中。

像 Airbyte 和其他公司这样的第三方供应商提供了这项服务——并不是因为它确实应该如此难以实施。但更有可能的是,由于 Databricks DLT/Delta Lake 方面此通用流程的文档或参考实现乏善可陈。

令人满意的答案将是(I)对 OP 中包含的(错误?)假设的一些背景/验证,(II)此工作流程缺少的代码,以及(III)对提出的 3 点的答案/澄清。

Eri*_*ric 0

如果源数据库并发处理事务,则 (1) 和 (2) 中的方法将不会生成一致的表副本,因为每个表的副本不会来自同一提交点。

您可以使用类似的方法pg_dump来获取数据库的一致转储,并将该文本合并到可以加载到 Spark DataFrames 中并保存的内容中。这将为您提供 Delta Lake 的一致快照。

再加上在 pubsub(如 Kafka)中捕获 Postgres 更改日志甚至可以让您以流式传输方式保持最新状态,但在实践中做到这一点相当复杂。