使用Spark collectionAccumulator时出现ConcurrentModificationException

cod*_*box 8 scala azure hdinsight apache-spark

我正在尝试在Azure HDInsight按需群集上运行基于Spark的应用程序,并且看到许多SparkExceptions(由ConcurrentModificationExceptions引起)被记录.启动本地Spark实例时,应用程序运行时没有这些错误.

在使用累加器时看到了类似错误的报告,而我的代码确实使用了CollectionAccumulator,但是我已经在我使用它的地方放置了同步块,并且没有任何区别.与累加器相关的代码如下所示:

class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

异常不会导致应用程序失败,但是当endOfBatch调用并且代码尝试从累加器中读取值时,它是空的并且processIt永远不会被调用.

我们使用HDInsight版本3.6和Spark版本2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:770)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more
Run Code Online (Sandbox Code Playgroud)

以下代码是一个更加自包含的示例,可以重现该问题.MyRecord是一个只包含数值的简单案例类.代码在本地运行时没有错误,但在HDInsight群集上会产生上述错误.

object MainDemo {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = MyRecord(i, 0, 0)
                myAccumulator.add(record)
            }
        })

        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            // we expect this to be called once for each record that we 'add' above,
            // but it is never called
            println(record)
        })
    }
}
Run Code Online (Sandbox Code Playgroud)

H R*_*Roy 1

我怀疑同步块是否真的有帮助。CustomeAccumulators 或所有其他累加器都不是线程安全的。它们实际上不必这样做,因为 Spark 驱动程序在任务完成(成功或失败)后用于更新累加器值的 DAGScheduler.updateAccumulators 方法仅在运行调度循环的单个线程上执行。除此之外,它们是具有自己的本地累加器引用的工作人员的只写数据结构,而仅驱动程序允许访问累加器的值。当你说它在本地模式下工作时,因为它是单个 JVM,但在集群模式下,它们是不同的 JVM 和 java 实例,正在触发 PRC 调用以启用通信。

MyRecord 对象的外观以及如果您只是以 .value 结束行而不是在其上使用迭代器会有所帮助。你试一试。

myAccumulator.value
Run Code Online (Sandbox Code Playgroud)