DF中每组的pyspark corr(超过5K列)

Har*_*ish 5 dataframe python-3.x apache-spark apache-spark-sql pyspark

我有一个拥有1亿行和5000多列的DF.我试图找到colx和剩余5000+列之间的corr.

aggList1 =  [mean(col).alias(col + '_m') for col in df.columns]  #exclude keys
df21= df.groupBy('key1', 'key2', 'key3', 'key4').agg(*aggList1)
df = df.join(broadcast(df21),['key1', 'key2', 'key3', 'key4']))
df= df.select([func.round((func.col(colmd) - func.col(colmd + '_m')), 8).alias(colmd)\
                     for colmd in all5Kcolumns])


aggCols= [corr(colx, col).alias(col) for col in colsall5K]
df2 = df.groupBy('key1', 'key2', 'key3').agg(*aggCols)
Run Code Online (Sandbox Code Playgroud)

现在因为Spark 64KB codegen问题(甚至是火花2.2)而无法正常工作.所以我循环每个300列并在最后合并所有列.但是在具有40个节点的集群中需要超过30个小时(每个节点10个核心,每个节点100GB).有什么帮助来调整这个?

下面已经尝试过 - 重新将DF分区为10,000 - 每个循环中的检查点 - 在每个循环中缓存

zer*_*323 2

您可以尝试使用一些 NumPy 和 RDD。首先是一堆进口:

from operator import itemgetter
import numpy as np
from pyspark.statcounter import StatCounter
Run Code Online (Sandbox Code Playgroud)

让我们定义几个变量:

keys = ["key1", "key2", "key3"] # list of key column names
xs = ["x1", "x2", "x3"]    # list of column names to compare
y = "y"                         # name of the reference column
Run Code Online (Sandbox Code Playgroud)

还有一些帮助者:

def as_pair(keys, y, xs):
    """ Given key names, y name, and xs names
    return a tuple of key, array-of-values"""
    key = itemgetter(*keys)
    value = itemgetter(y, * xs)  # Python 3 syntax

    def as_pair_(row):
        return key(row), np.array(value(row))
    return as_pair_

def init(x):
    """ Init function for combineByKey
    Initialize new StatCounter and merge first value"""
    return StatCounter().merge(x)

def center(means):
    """Center a row value given a 
    dictionary of mean arrays
    """
    def center_(row):
        key, value = row
        return key, value - means[key]
    return center_

def prod(arr):
    return arr[0] * arr[1:]

def corr(stddev_prods):
    """Scale the row to get 1 stddev 
    given a dictionary of stddevs
    """
    def corr_(row):
        key, value = row
        return key, value / stddev_prods[key]
    return corr_
Run Code Online (Sandbox Code Playgroud)

并转换DataFrameRDDof 对:

pairs = df.rdd.map(as_pair(keys, y, xs))
Run Code Online (Sandbox Code Playgroud)

接下来让我们计算每组的统计数据:

stats = (pairs
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
    .collectAsMap())

means = {k: v.mean() for k, v in stats.items()}
Run Code Online (Sandbox Code Playgroud)

注意:对于 5000 个特征和 7000 个组,将此结构保留在内存中应该没有问题。对于较大的数据集,您可能必须使用 RDD,join但这会更慢。

将数据居中:

centered = pairs.map(center(means))
Run Code Online (Sandbox Code Playgroud)

计算协方差:

covariance = (centered
    .mapValues(prod)
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
    .mapValues(StatCounter.mean))
Run Code Online (Sandbox Code Playgroud)

最后是相关性:

stddev_prods = {k: prod(v.stdev()) for k, v in stats.items()}

correlations = covariance.map(corr(stddev_prods))
Run Code Online (Sandbox Code Playgroud)

示例数据:

df = sc.parallelize([
    ("a", "b", "c", 0.5, 0.5, 0.3, 1.0),
    ("a", "b", "c", 0.8, 0.8, 0.9, -2.0), 
    ("a", "b", "c", 1.5, 1.5, 2.9, 3.6),
    ("d", "e", "f", -3.0, 4.0, 5.0, -10.0),
    ("d", "e", "f", 15.0, -1.0, -5.0, 10.0),
]).toDF(["key1", "key2", "key3", "y", "x1", "x2", "x3"])
Run Code Online (Sandbox Code Playgroud)

结果DataFrame

df.groupBy(*keys).agg(*[corr(y, x) for x in xs]).show()
Run Code Online (Sandbox Code Playgroud)
from operator import itemgetter
import numpy as np
from pyspark.statcounter import StatCounter
Run Code Online (Sandbox Code Playgroud)

以及上面提供的方法:

correlations.collect()
Run Code Online (Sandbox Code Playgroud)
keys = ["key1", "key2", "key3"] # list of key column names
xs = ["x1", "x2", "x3"]    # list of column names to compare
y = "y"                         # name of the reference column
Run Code Online (Sandbox Code Playgroud)

这个解决方案虽然有点复杂,但非常有弹性,可以轻松调整以处理不同的数据分布。JIT 也应该可以得到进一步的推动。