使用 PySpark 训练多个词嵌入模型陷入困境

rca*_*901 9 python apache-spark pyspark databricks

很高兴终于发布我的第一个问题,但如果我不清楚或违反标准礼仪,请轻推我。我真诚地感谢我能得到的任何帮助。

我正在尝试使用PySpark(在 Databricks 中)并行训练许多语料库(每个语料库对应于不同的作者)的嵌入。每位作者的每个语料库大小不超过 1GB

corpi/df 的形式为:

+----------------+------------------------------------------------------------+
|          author|                                                      corpus|
+----------------+------------------------------------------------------------+
|            john| [["hello"], ["these", "are", "john's", "thoughts"]]        |
|           steve| [["hello"], ["these", "are", "steve's", "thoughts"]]       |
|          markus| [["hello"], ["these", "are", "markus's", "thoughts"]]       |
+----------------+------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

过去的尝试:

  • 定义了一个 UDF,它使用 的gensimWord2Vec 函数应用于上面的每一行。这适用于小型公司,但无论我将其制作多大,我都会遇到 OutOfMemory 错误spark.executor.memory,这会导致我失去执行者、无限期挂起等......
  • 现在,我尝试将explode上面的数据框转换为每行都是一个句子。然后我想我会做一个df.groupBy('author'),我会定义一个 UDF,它实现 Spark 的Word2Vec模型来训练组并保存模型。不幸的是,即使只使用几个作者、重新分区等......,它也不会让我在不无限期挂起的情况下分解数据框,所以我坚持使用上面显示的数据框。

簇 :

  • 1 个驱动程序 = 56GB 内存,16 核
  • 1 - 8 个工作人员(自动缩放)56GB 内存,16 核

配置(受此处启发):

yarn.nodemanager.pmem-check-enabled false
spark.databricks.delta.preview.enabled true
spark.executor.cores 5
spark.executor.memory 16gb
spark.executor.instances 26
spark.driver.memory 16gb
spark.yarn.executor.memoryOverhead 2gb
yarn.nodemanager.vmem-check-enabled false
spark.default.parallelism 260
spark.driver.cores 5
Run Code Online (Sandbox Code Playgroud)

表现 :

内存使用率低

执行器上的错误:

WARN HangingTaskDetector: Task 1527 is probably not making progress because its metrics (Map(internal.metrics.shuffle.read.localBlocksFetched -> 0, internal.metrics.shuffle.read.remoteBytesReadToDisk -> 0, internal.metrics.shuffle.write.bytesWritten -> 0, internal.metrics.output.recordsWritten -> 0, internal.metrics.shuffle.write.recordsWritten -> 0, internal.metrics.memoryBytesSpilled -> 0, internal.metrics.shuffle.read.remoteBytesRead -> 0, internal.metrics.diskBytesSpilled -> 0, internal.metrics.shuffle.read.localBytesRead -> 0, internal.metrics.shuffle.read.recordsRead -> 0, internal.metrics.output.bytesWritten -> 0, internal.metrics.input.bytesRead -> 119650145, internal.metrics.input.recordsRead -> 4096, internal.metrics.shuffle.read.remoteBlocksFetched -> 0))
Run Code Online (Sandbox Code Playgroud)

驱动程序标准输出日志:

2021-07-29T02:27:25.955+0000: [GC (Allocation Failure) [PSYoungGen: 5309440K->135150K(5444608K)] 5520626K->401147K(16629760K), 0.0788413 secs] [Times: user=0.22 sys=0.14, real=0.08 secs]
Run Code Online (Sandbox Code Playgroud)

日志 :

21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_29_piece0 on 10.161.179.16:42153 in memory (size: 7.4 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.161.179.6:40551 in memory (size: 28.6 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.19:38301 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_37_piece0 on 10.161.179.19:36657 in memory (size: 13.4 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.19:36657 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_31_piece0 on 10.161.179.10:45589 in memory (size: 11.5 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_35_piece0 on 10.161.179.10:40843 in memory (size: 8.3 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.16:42707 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.161.179.19:38301 in memory (size: 8.0 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.161.179.19:36657 in memory (size: 8.0 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.161.179.6:37313 in memory (size: 8.0 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.6:37313 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_8_piece0 on 10.161.179.6:37313 in memory (size: 62.5 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_37_piece0 on 10.161.179.8:41599 in memory (size: 13.4 KiB, free: 8.4 GiB)
21/07/29 02:29:16 INFO DriverCorral: DBFS health check ok
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Starting...
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Start completed.
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Shutdown completed.
21/07/29 02:29:17 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 84 milliseconds)
21/07/29 02:29:25 INFO HiveMetaStore: 1: get_database: default
21/07/29 02:29:25 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_database: default   
21/07/29 02:29:25 INFO DriverCorral: Metastore health check ok
Run Code Online (Sandbox Code Playgroud)

所以我的问题是双重的,

  1. 我只是尝试分解行然后使用应用于 UDF 来训练 Word2Vec 嵌入,这是不是做错了什么groupBy.('author')?我知道这explode非常昂贵,因为它会创建大量的行,但这似乎是最好的选择。
  2. 是否有更有效/不同的方法来解决这个问题,explode完全避免挂起?

如果我可以提供任何其他可能有帮助的信息,请告诉我,我欢迎任何有助于我将来更清楚地提出问题的建议。

更新: 将 更改spark.sql.shuffle.partitions为大量后,情况确实有所改善,但是当执行器完成任务时,它们会由于“工作人员丢失”或“远程 RPC 客户端解除关联。可能......”而不断被创建和销毁(其中有尽管更改了许多不同的驱动程序/执行程序内存/核心配置,但过去仍然困扰着我)。事件时间线如下所示:

时间线

因此,我的上述问题仍然存在,但找到正确的配置似乎又近了一步。