我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.
spark = SparkSession.builder
.master("ip")
.enableHiveSupport()
.getOrCreate()
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)
当我将配置放入spark提交时,它工作正常
spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud) 我正在开发 5 节点集群,每个集群 7 核,每个节点 25GB。我当前的执行使用 1-2GB 输入数据,我能知道为什么我会遇到以下错误吗?我使用 pyspark 数据框(火花 1.6.2)
[Stage 9487:===================================================>(198 + 2) / 200]16/08/13 16:43:18 ERROR TaskSchedulerImpl: Lost executor 3 on server05: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=================================================>(198 + -49) / 200]16/08/13 16:43:19 ERROR TaskSchedulerImpl: Lost executor 1 on server04: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=========> (24 …Run Code Online (Sandbox Code Playgroud) 我想从 mappartition 向 python 函数传递一些额外的参数。有什么建议..
我的示例代码如下所示
def test(x,abc):
<<code>>
abc =1234
df = df.repartition("key")
res= df.rdd.mapPartitions(test, abc)
Run Code Online (Sandbox Code Playgroud)
如果我传递 abc 作为参数并在测试函数中使用它,我会收到以下错误
例外:您似乎正在尝试广播 RDD 或从操作或转换引用 RDD。RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关更多信息,请参阅 SPARK-5063。
Mariusz 请找到零钱
from pyspark.sql import Row
def test(abc):
def my_map_partitions(x):
print("----------start-----------")
cnt=1
ret = []
for i in x:
cnt=cnt+1
val = Row(key1=i.key1, key2=i.key2, cnt=cnt)
ret.append(val)
return ret
return my_map_partitions
df = df.repartition("key1key2").sortWithinPartitions("key1key2")
abc123 = df .rdd.mapPartitions(test(abc))
Run Code Online (Sandbox Code Playgroud) 我有一个拥有1亿行和5000多列的DF.我试图找到colx和剩余5000+列之间的corr.
aggList1 = [mean(col).alias(col + '_m') for col in df.columns] #exclude keys
df21= df.groupBy('key1', 'key2', 'key3', 'key4').agg(*aggList1)
df = df.join(broadcast(df21),['key1', 'key2', 'key3', 'key4']))
df= df.select([func.round((func.col(colmd) - func.col(colmd + '_m')), 8).alias(colmd)\
for colmd in all5Kcolumns])
aggCols= [corr(colx, col).alias(col) for col in colsall5K]
df2 = df.groupBy('key1', 'key2', 'key3').agg(*aggCols)
Run Code Online (Sandbox Code Playgroud)
现在因为Spark 64KB codegen问题(甚至是火花2.2)而无法正常工作.所以我循环每个300列并在最后合并所有列.但是在具有40个节点的集群中需要超过30个小时(每个节点10个核心,每个节点100GB).有什么帮助来调整这个?
下面已经尝试过 - 重新将DF分区为10,000 - 每个循环中的检查点 - 在每个循环中缓存