小编Har*_*ish的帖子

spark 2.1.0 session config settings(pyspark)

我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.

 spark  = SparkSession.builder
                      .master("ip")
                      .enableHiveSupport()
                      .getOrCreate()

 spark.conf.set("spark.executor.memory", '8g')
 spark.conf.set('spark.executor.cores', '3')
 spark.conf.set('spark.cores.max', '3')
 spark.conf.set("spark.driver.memory",'8g')
 sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)

当我将配置放入spark提交时,它工作正常

spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

31
推荐指数
2
解决办法
4万
查看次数

远程 RPC 客户端解除关联。可能是由于容器超过阈值或网络问题。检查驱动程序日志以获取 WARN 消息

我正在开发 5 节点集群,每个集群 7 核,每个节点 25GB。我当前的执行使用 1-2GB 输入数据,我能知道为什么我会遇到以下错误吗?我使用 pyspark 数据框(火花 1.6.2)

[Stage 9487:===================================================>(198 + 2) / 200]16/08/13 16:43:18 ERROR TaskSchedulerImpl: Lost executor 3 on server05: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=================================================>(198 + -49) / 200]16/08/13 16:43:19 ERROR TaskSchedulerImpl: Lost executor 1 on server04: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=========>                                          (24 …
Run Code Online (Sandbox Code Playgroud)

pyspark spark-dataframe

8
推荐指数
1
解决办法
1万
查看次数

带有额外参数的映射分区 pyspark

我想从 mappartition 向 python 函数传递一些额外的参数。有什么建议..

我的示例代码如下所示

 def test(x,abc):
   <<code>>

 abc =1234
 df = df.repartition("key")
 res= df.rdd.mapPartitions(test, abc)
Run Code Online (Sandbox Code Playgroud)

如果我传递 abc 作为参数并在测试函数中使用它,我会收到以下错误

例外:您似乎正在尝试广播 RDD 或从操作或转换引用 RDD。RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关更多信息,请参阅 SPARK-5063。

Mariusz 请找到零钱

from pyspark.sql import Row
def test(abc):
    def my_map_partitions(x):
       print("----------start-----------")
       cnt=1
       ret = []
       for i in x:
         cnt=cnt+1
         val = Row(key1=i.key1, key2=i.key2, cnt=cnt)
         ret.append(val)
       return ret 
    return my_map_partitions
df = df.repartition("key1key2").sortWithinPartitions("key1key2")  
abc123 = df .rdd.mapPartitions(test(abc)) 
Run Code Online (Sandbox Code Playgroud)

python pyspark

5
推荐指数
1
解决办法
5515
查看次数

DF中每组的pyspark corr(超过5K列)

我有一个拥有1亿行和5000多列的DF.我试图找到colx和剩余5000+列之间的corr.

aggList1 =  [mean(col).alias(col + '_m') for col in df.columns]  #exclude keys
df21= df.groupBy('key1', 'key2', 'key3', 'key4').agg(*aggList1)
df = df.join(broadcast(df21),['key1', 'key2', 'key3', 'key4']))
df= df.select([func.round((func.col(colmd) - func.col(colmd + '_m')), 8).alias(colmd)\
                     for colmd in all5Kcolumns])


aggCols= [corr(colx, col).alias(col) for col in colsall5K]
df2 = df.groupBy('key1', 'key2', 'key3').agg(*aggCols)
Run Code Online (Sandbox Code Playgroud)

现在因为Spark 64KB codegen问题(甚至是火花2.2)而无法正常工作.所以我循环每个300列并在最后合并所有列.但是在具有40个节点的集群中需要超过30个小时(每个节点10个核心,每个节点100GB).有什么帮助来调整这个?

下面已经尝试过 - 重新将DF分区为10,000 - 每个循环中的检查点 - 在每个循环中缓存

dataframe python-3.x apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
847
查看次数