Spark-land中有几个相似但又不同的概念,围绕着如何将工作分配到不同的节点并同时执行.具体来说,有:
sparkDriverCount
)numWorkerNodes
)numExecutors
)dataFrame
)dataFrame
(numDFRows
)中的行数dataFrame
(numPartitions
)上的分区数numCpuCoresPerWorker
)我相信所有Spark集群都有一个且只有一个 Spark Driver,然后是0+个工作节点.如果我错了,请先纠正我!假设我或多或少是正确的,让我们在这里锁定一些变量.假设我们有一个带有1个驱动程序和4个工作节点的Spark集群,每个工作节点上有4个CPU核心(因此总共有16个CPU核心).所以这里的"给定"是:
sparkDriverCount = 1
numWorkerNodes = 4
numCpuCores = numWorkerNodes * numCpuCoresPerWorker = 4 * 4 = 16
Run Code Online (Sandbox Code Playgroud)
鉴于作为设置,我想知道如何确定一些事情.特别:
numWorkerNodes
和之间有什么关系numExecutors
?是否有一些已知/普遍接受的工人与遗嘱执行人的比例?有没有办法确定numExecutors
给定numWorkerNodes
(或任何其他输入)?numDFRows
为numPartitions
?如何根据dataFrame
?的大小计算"最佳"分区数?numPartitions = numWorkerNodes * numCpuCoresPerWorker
那有什么道理吗?换句话说,它规定每个CPU核心应该有一个分区.partitioning distributed-computing bigdata apache-spark spark-dataframe
我正在尝试使用spark进行一些简单的机器学习任务.我使用pyspark和spark 1.2.0来做一个简单的逻辑回归问题.我有120万条培训记录,我记录了记录的功能.当我将散列函数的数量设置为1024时,程序运行正常,但是当我将散列函数的数量设置为16384时,程序会多次失败并出现以下错误:
Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) …
Run Code Online (Sandbox Code Playgroud) python java distributed-computing logistic-regression apache-spark
KMeans有几个参数用于训练,初始化模式默认为kmeans ||.问题是它快速(少于10分钟)前进到前13个阶段,然后完全挂起,不会产生错误!
再现问题的最小示例(如果我使用1000点或随机初始化,它将成功):
from pyspark.context import SparkContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
sc = SparkContext(appName='kmeansMinimalExample')
# same with 10000 points
data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
C = KMeans.train(data, 8192, maxIterations=10)
sc.stop()
Run Code Online (Sandbox Code Playgroud)
这项工作什么都不做(它没有成功,失败或进展......),如下所示."执行者"选项卡中没有活动/失败的任务.Stdout和Stderr Logs没有特别有趣的东西:
如果我使用k=81
,而不是8192,它将成功:
请注意,这两个电话takeSample()
,不应该是一个问题,因为有在随机初始化的情况下打了两次电话.
那么,发生了什么?Spark的Kmeans 无法扩展吗?有人知道吗?你可以重现吗?
如果这是一个内存问题,我会像以前一样得到警告和错误.
注意:placeybordeaux的注释基于在客户端模式下执行作业,其中驱动程序的配置无效,导致退出代码143等(请参阅编辑历史记录),而不是群集模式,其中根本没有报告错误,应用程序只是挂起.
从零到323:为什么Spark Mllib KMeans算法非常慢?是相关的,但我认为他目睹了一些进展,而我的确悬而未决,我确实发表评论......