在规范化大型PySpark数据帧时,CodeGen增长超过64 KB

Mar*_*kNS 5 window-functions apache-spark apache-spark-sql pyspark pyspark-sql

我有一个包含1300万行和800列的PySpark数据帧.我需要对这些数据进行规范化,因此一直使用此代码,该代码适用于较小的开发数据集.

def z_score_w(col, w):
    avg_ = avg(col).over(w)
    stddev_ = stddev_pop(col).over(w)
    return (col - avg_) / stddev_

w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]

normDF = signalsDF.select(norm_exprs)
Run Code Online (Sandbox Code Playgroud)

但是,在使用完整数据集时,我遇到了codegen的异常:

        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        ... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
        at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
        at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
        at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
        at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
        at org.codehaus.janino.UnitCompiler.access$7100(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$10.visitBooleanLiteral(UnitCompiler.java:3267)
Run Code Online (Sandbox Code Playgroud)

周围有一些Spark JIRA问题看似相似,但这些问题都标记为已解决.还有这个SO问题是相关的,但答案是另一种技术.

我有自己的解决方法,我规范化数据帧的批次列.这是有效的,但我最终得到了多个数据帧,然后我必须加入,这很慢.

所以,我的问题是 - 是否有一种替代技术可以规范化我丢失的大型数据帧?

我正在使用spark-2.0.1.

zer*_*323 4

一个明显的问题是使用窗口函数的方式。如下框架:

Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
Run Code Online (Sandbox Code Playgroud)

在实践中有点没用。如果没有分区列,它首先将所有数据重新排列到单个分区。此缩放方法仅适用于按组执行缩放。

Spark 提供了两个可用于扩展功能的类:

  • pyspark.ml.feature.StandardScaler
  • pyspark.mllib.feature.StandardScaler

不幸的是,两者都需要Vector数据作为输入。借助机器学习

Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
Run Code Online (Sandbox Code Playgroud)

scaled如果您需要原始形状,则需要进一步扩展该列。

使用 MLlib:

from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
from pyspark.ml import Pipeline

scaled = Pipeline(stages=[
    VectorAssembler(inputCols=df.columns, outputCol="features"), 
    MLScaler(withMean=True, inputCol="features", outputCol="scaled")
]).fit(df).transform(df).select("scaled")
Run Code Online (Sandbox Code Playgroud)

如果存在与列数相关的代码生成问题,后一种方法可能更有用。

解决此问题的另一种方法是计算全局统计数据

from pyspark.mllib.feature import StandardScaler as MLLibScaler
from pyspark.mllib.linalg import DenseVector

rdd = df.rdd.map(DenseVector)
scaler = MLLibScaler(withMean=True, withStd=True)

scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)
Run Code Online (Sandbox Code Playgroud)

并选择:

from pyspark.sql.functions import avg, col, stddev_pop, struct

stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()
Run Code Online (Sandbox Code Playgroud)

根据您的评论,您认为可以使用 NumPy 和一些基本转换来表达最简单的解决方案:

df.select(*[
    ((col(c) - mean) / std).alias(c)
    for (c, (mean, std)) in zip(df.columns, stats)
])
Run Code Online (Sandbox Code Playgroud)

并转换回DataFrame

rdd = df.rdd.map(np.array)  # Convert to RDD of NumPy vectors
stats = rdd.stats()  # Compute mean and std
scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev())  # Normalize
Run Code Online (Sandbox Code Playgroud)