KryoException:缓冲区溢出,输入非常小

Joe*_*Joe 10 apache-spark

我有一个琐碎的火花计划.我已将输入修剪为一个文件,其中包含一行.所以我相信这不是传统的记忆压力.

Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296
    at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at carbonite.serializer$write_map.invoke(serializer.clj:69)
Run Code Online (Sandbox Code Playgroud)

我可以设定spark.kryoserializer.buffer.mb,但我想我只是推迟了这个问题.我想了解它.

我不认为该计划有任何不规范之处.如果我删除一行(看似随机),则错误消失.

看起来我正在达到某种固定限制.但事实上,我的输入文件非常小,我正在做的唯一操作是可预测的maps,reduceByKey我怀疑还有其他的东西.

我正在使用Flambo Clojure 0.4.0库(但我不认为这会导致它)和Spark Core 2.10.

这是最小的工作示例.对不起,它有点神秘,但我删除了一切无关紧要的东西.

(ns mytest.core
  (:require [flambo.conf :as conf])
  (:require [flambo.api :as f]))

(def sc (f/spark-context (-> (conf/spark-conf)
           (conf/master "local")
           (conf/app-name "test")
           (conf/set "spark.driver.memory" "1g")
           (conf/set "spark.executor.memory" "1g"))))

(defn -main

  [& args]
  (let [logfile (f/text-file sc "file://tmp/one-line-file")
        a (f/map logfile (f/fn [u] nil))
        b (f/map logfile (f/fn [u] nil))
        c (f/map logfile (f/fn [u] nil))
        d (f/map logfile (f/fn [u] nil))
        e (f/map logfile (f/fn [u] nil))
        g (f/map logfile (f/fn [u] nil))
        h (f/map logfile (f/fn [u] nil))
        i (f/map logfile (f/fn [u] nil))
        j (f/map logfile (f/fn [u] nil))
        k (f/map logfile (f/fn [u] nil))
        l (f/map logfile (f/fn [u] nil))
        m (f/map logfile (f/fn [u] nil))
        n (f/map logfile (f/fn [u] nil))  
        o (f/map logfile (f/fn [u] nil))
        p (f/map logfile (f/fn [u] nil))
        q (f/map logfile (f/fn [u] nil))
        r (f/map logfile (f/fn [u] nil))
        s (f/map logfile (f/fn [u] nil))
        t (f/map logfile (f/fn [u] nil))
]))
Run Code Online (Sandbox Code Playgroud)

编辑

如果我将其拆分为两个块并重新创建惰性文件流,它可以工作:

(defn get-inputs []
  (f/text-file sc "file://tmp/one-line-file"))

(defn -main

  [& args]
  (let [logfile (get-inputs)
        a (f/map logfile (f/fn [u] nil))
        b (f/map logfile (f/fn [u] nil))
        c (f/map logfile (f/fn [u] nil))
        d (f/map logfile (f/fn [u] nil))
        e (f/map logfile (f/fn [u] nil))
        g (f/map logfile (f/fn [u] nil))
        h (f/map logfile (f/fn [u] nil))
        i (f/map logfile (f/fn [u] nil))])

  (let [logfile (get-inputs)
        j (f/map logfile (f/fn [u] nil))
        k (f/map logfile (f/fn [u] nil))
        l (f/map logfile (f/fn [u] nil))
        m (f/map logfile (f/fn [u] nil))
        n (f/map logfile (f/fn [u] nil))  
        o (f/map logfile (f/fn [u] nil))
        p (f/map logfile (f/fn [u] nil))
        q (f/map logfile (f/fn [u] nil))
        r (f/map logfile (f/fn [u] nil))
        s (f/map logfile (f/fn [u] nil))
        t (f/map logfile (f/fn [u] nil))]))
Run Code Online (Sandbox Code Playgroud)

在Java中,这相当于创建两个本地范围(例如,两个单独的方法).并且get-inputs只是一个返回新构造的文本文件对象的方法.

我认为该textFile方法会创建一个可以(重新)读取多次的惰性流,因此这两个示例应该没有太大区别.

小智 4

将其添加到您的 Spark 上下文 conf 中:

conf.set("spark.kryoserializer.buffer.mb","128")
Run Code Online (Sandbox Code Playgroud)

  • 谢谢,我已经这样做了,但这并不能回答我的问题。为什么单个文件的缓冲区会耗尽空间? (3认同)