从 Pyspark df 写入超过 5000 万条到 PostgresSQL,最有效的方法

Che*_*van 16 postgresql bigdata apache-spark apache-spark-sql pyspark

将数百万条记录从 Spark 数据帧插入到 Postgres 表的最有效方法是 5000 万条记录。过去,我通过使用批量复制和批量大小选项从 spark 到MSSQL做到了这一点, 这也很成功。

Postgres 是否有类似的东西?

添加我尝试过的代码以及运行该过程所需的时间:

def inserter():
    start = timer()
    sql_res.write.format("jdbc").option("numPartitions","5").option("batchsize","200000")\
    .option("url", "jdbc:postgresql://xyz.com:5435/abc_db") \
    .option("dbtable", "public.full_load").option("user", "root").option("password", "password").save()
    end = timer()
    print(timedelta(seconds=end-start))
inserter()
Run Code Online (Sandbox Code Playgroud)

因此,我对 1000 万条记录执行了上述方法,并按照 中指定的numPartitions方式进行了 5 个并行连接,并且还尝试了 200k 的批量大小

该过程花费的总时间为0:14:05.760926(十四分五秒)。

有没有其他有效的方法可以减少时间?

我可以使用的有效或最佳批量大小是多少?增加我的批量大小会更快地完成工作吗?或者打开多个连接,即> 5 帮助我加快进程?

1000 万条记录平均 14 分钟还不错,但正在寻找以前会这样做的人来帮助回答这个问题。

dbu*_*osp 5

实际上,我不久前做了一些相同的工作,但使用的是 Apache Sqoop。

我想说,为了回答这个问题,我们必须尝试优化 Spark 和 PostgresSQL 之间的通信,特别是从 Spark 到 PostgreSql 的数据流。

但要小心,不要忘记 Spark 的一面。如果分区数与PostgreSQL支持的最大连接数相比过高,则执行mapPartitions是没有意义的,如果您的分区太多并且每个分区都打开一个连接,则可能会出现以下错误org.postgresql.util.PSQLException: FATAL: sorry, too many clients already.

为了调整插入过程,我将按照以下步骤解决问题:

  • 记住分区的数量很重要。检查分区数,然后根据您想要的并行连接数进行调整。您可能希望每个分区有一个连接,因此我建议检查coalesce,正如这里提到的。
  • 检查您的 postgreSQL 实例支持的最大连接数,并且您想增加该数量
  • 建议使用 COPY 命令将数据插入 PostgreSQL 。这里还有一个关于如何加速 postgreSQL 插入的更详细的答案。

最后,没有什么灵丹妙药可以完成这项工作。您可以使用我上面提到的所有技巧,但这实际上取决于您的数据和用例。