小编RKD*_*314的帖子

使用Pyspark计算Spark数据帧的每列中的非NaN条目数

我有一个非常大的数据集,在Hive中加载.它由大约190万行和1450列组成.我需要确定每个列的"覆盖率",即每个列具有非NaN值的行的分数.

这是我的代码:

from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string

sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)

df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()

covgs=sc.parallelize(df.columns)
        .map(lambda x: str(x))
        .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Run Code Online (Sandbox Code Playgroud)

在pyspark shell中尝试这个,如果我然后执行covgs.take(10),它会返回一个相当大的错误堆栈.它说保存文件时出现问题/usr/lib64/python2.6/pickle.py.这是错误的最后一部分:

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

如果有更好的方法来实现这一点,而不是我正在尝试的方式,我愿意接受建议.我不能使用pandas,因为它目前在我工作的集群上不可用,我无权安装它.

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
1
解决办法
3万
查看次数

计算PySpark DataFrame列的模式?

最终我想要的是DataFrame中所有列的列模式.对于其他摘要统计,我看到了几个选项:使用DataFrame聚合,或将DataFrame的列映射到向量的RDD(我也遇到了麻烦)并使用colStatsMLlib.但我不认为模式是一种选择.

python apache-spark apache-spark-sql pyspark

6
推荐指数
2
解决办法
6059
查看次数

自定义模块的功能在PySpark中不起作用,但在交互模式下输入时它们起作用

我有一个模块,我写的包含作用于PySpark DataFrames的函数.他们对DataFrame中的列进行转换,然后返回一个新的DataFrame.以下是代码的示例,缩写为仅包含其中一个函数:

from pyspark.sql import functions as F
from pyspark.sql import types as t

import pandas as pd
import numpy as np

metadta=pd.DataFrame(pd.read_csv("metadata.csv"))  # this contains metadata on my dataset

def str2num(text):
    if type(text)==None or text=='' or text=='NULL' or text=='null':
        return 0
    elif len(text)==1:
        return ord(text)
    else:
        newnum=''
        for lettr in text:
            newnum=newnum+str(ord(lettr))
        return int(newnum)

str2numUDF = F.udf(lambda s: str2num(s), t.IntegerType())

def letConvNum(df):    # df is a PySpark DataFrame
    #Get a list of columns that I want to transform, using …
Run Code Online (Sandbox Code Playgroud)

pyspark pyspark-sql

6
推荐指数
2
解决办法
4940
查看次数