我想任何使用过 Spark 的人都遇到过 OOM 错误,而且通常很容易找到问题的根源。不过,我对这个问题有点困惑。目前,我正在尝试使用该功能通过两个不同的分区进行保存partitionBy
。它看起来像下面这样(虚构的名字):
df.write.partitionBy("account", "markers")
.mode(SaveMode.Overwrite)
.parquet(s"$location$org/$corrId/")
Run Code Online (Sandbox Code Playgroud)
这个特定的数据框包含大约 30GB 的数据、2000 个帐户和 30 个标记。账户和标记的分布接近均匀。我尝试使用亚马逊 r4.8xlarge 的 5 个核心节点和 1 个主节点驱动程序(220+ GB 内存),并使用默认的最大化资源分配设置(其中 2 个核心用于执行程序,大约 165 GB 内存)。我还明确将核心数、执行器数设置为不同的数字,但遇到了相同的问题。当查看 Ganglia 时,我没有看到任何过多的内存消耗。
因此,根本原因很可能是随机播放时可能发生的 2gb ByteArrayBuffer 问题。然后,我尝试使用各种数字(例如 100、500、1000、3000、5000 和 10000)重新分区数据帧,但没有成功。该作业偶尔会记录堆空间错误,但大多数情况下都会给出节点丢失错误。当查看各个节点日志时,它似乎突然失败,没有任何问题迹象(对于某些 oom 异常,这并不奇怪)。
对于数据帧写入,partitionBy 是否有技巧可以传递内存堆空间错误?
我见过这个问题的不同版本,但我还没有找到令人满意的答案。基本上,我想做从 keras model.to_json()
、model.get_weights()
、model.from_json()
、model.set_weights()
到tensorflow 的等效操作。我想我已经接近目标了,但我却陷入了困境。我更希望能够在同一个字符串中获得权重和图表,但我知道这是否不可能。
目前,我所拥有的是:
g = optimizer.minimize(loss_op,
global_step=tf.train.get_global_step())
de = g.graph.as_graph_def()
json_string = json_format.MessageToJson(de)
gd = tf.GraphDef()
gd = json_format.Parse(json_string, gd)
Run Code Online (Sandbox Code Playgroud)
这似乎很好地创建了图,但显然元图不包含变量、权重等。还有元图,但我唯一看到的是export_meta_graph,它似乎没有以相同的方式序列化。我看到 MetaGraph 有一个原型函数,但我不知道如何序列化这些变量。
简而言之,如何获取张量流模型(权重、图形等模型),将其序列化为字符串(最好是 json),然后反序列化并继续训练或提供预测。
以下是让我接近目标并且我已经尝试过的事情,但大多数情况下在需要写入磁盘方面都有限制,在这种情况下我无法做到这一点:
所以这个问题一直让我感到疯狂,并且开始感觉像s3的火花并不是这项特定工作的正确工具.基本上,我在s3存储桶中有数百万个较小的文件.由于我无法进入的原因,这些文件无法合并(一个是唯一的加密转录本).我见过类似的问题,每一个解决方案都没有产生好的结果.我尝试的第一件事是外卡:
sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();
Run Code Online (Sandbox Code Playgroud)
注意:计数更多地调试处理文件所需的时间.这项工作几乎花了一整天时间超过10个实例,但仍未通过列表底部发布的错误.然后我找到了这个链接,它基本上说这不是最佳的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from- S3-my.html
然后,我决定尝试另一种我目前无法找到的解决方案,即加载所有路径,然后联合所有rdds
ObjectListing objectListing = s3Client.listObjects(bucket);
List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
//initializes objectListing
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
while(objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
if (tempMeta.size() > 5000) {
rdds.addAll(tempMeta);
tempMeta = new …
Run Code Online (Sandbox Code Playgroud)