我有一个琐碎的火花计划.我已将输入修剪为一个文件,其中包含一行.所以我相信这不是传统的记忆压力.
Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296
at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at carbonite.serializer$write_map.invoke(serializer.clj:69)
Run Code Online (Sandbox Code Playgroud)
我可以设定spark.kryoserializer.buffer.mb,但我想我只是推迟了这个问题.我想了解它.
我不认为该计划有任何不规范之处.如果我删除一行(看似随机),则错误消失.
看起来我正在达到某种固定限制.但事实上,我的输入文件非常小,我正在做的唯一操作是可预测的maps,reduceByKey我怀疑还有其他的东西.
我正在使用Flambo Clojure 0.4.0库(但我不认为这会导致它)和Spark Core 2.10.
这是最小的工作示例.对不起,它有点神秘,但我删除了一切无关紧要的东西.
(ns mytest.core
(:require [flambo.conf :as conf])
(:require [flambo.api :as f]))
(def sc (f/spark-context (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "test")
(conf/set "spark.driver.memory" "1g")
(conf/set "spark.executor.memory" "1g"))))
(defn -main
[& args]
(let [logfile (f/text-file sc "file://tmp/one-line-file")
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))
]))
Run Code Online (Sandbox Code Playgroud)
编辑
如果我将其拆分为两个块并重新创建惰性文件流,它可以工作:
(defn get-inputs []
(f/text-file sc "file://tmp/one-line-file"))
(defn -main
[& args]
(let [logfile (get-inputs)
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))])
(let [logfile (get-inputs)
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))]))
Run Code Online (Sandbox Code Playgroud)
在Java中,这相当于创建两个本地范围(例如,两个单独的方法).并且get-inputs只是一个返回新构造的文本文件对象的方法.
我认为该textFile方法会创建一个可以(重新)读取多次的惰性流,因此这两个示例应该没有太大区别.
小智 4
将其添加到您的 Spark 上下文 conf 中:
conf.set("spark.kryoserializer.buffer.mb","128")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5803 次 |
| 最近记录: |