标签: pyspark

如何在pyspark中获取dataframe列的名称?

在pandas中,这可以通过column.name来完成.

但是当它的火花数据帧列如何做同样的事情?

例如,调用程序有一个spark数据帧:spark_df

>>> spark_df.columns
['admit', 'gre', 'gpa', 'rank']
Run Code Online (Sandbox Code Playgroud)

这个程序调用我的函数:my_function(spark_df ['rank'])在my_function中,我需要列的名称即'rank'

如果是pandas数据帧,我们可以在my_function中使用

>>> pandas_df['rank'].name
'rank'
Run Code Online (Sandbox Code Playgroud)

pyspark pyspark-sql

30
推荐指数
4
解决办法
6万
查看次数

Pyspark:在UDF中传递多个列

我正在编写一个用户定义的函数,它将获取除数据帧中第一个之外的所有列并进行求和(或任何其他操作).现在数据框有时可以有3列或4列或更多列.它会有所不同.

我知道我可以硬编码4个列名作为UDF传递,但在这种情况下它会有所不同所以我想知道如何完成它?

以下是第一个示例中的两个示例,我们有两列要添加,第二个示例中我们有三列要添加.

在此输入图像描述

apache-spark pyspark spark-dataframe

30
推荐指数
4
解决办法
3万
查看次数

collect_list通过保留基于另一个变量的顺序

我正在尝试使用现有列集上的groupby聚合在Pyspark中创建新列表.下面提供了一个示例输入数据框:

------------------------
id | date        | value
------------------------
1  |2014-01-03   | 10 
1  |2014-01-04   | 5
1  |2014-01-05   | 15
1  |2014-01-06   | 20
2  |2014-02-10   | 100   
2  |2014-03-11   | 500
2  |2014-04-15   | 1500
Run Code Online (Sandbox Code Playgroud)

预期的产出是:

id | value_list
------------------------
1  | [10, 5, 15, 20]
2  | [100, 500, 1500]
Run Code Online (Sandbox Code Playgroud)

列表中的值按日期排序.

我尝试使用collect_list如下:

from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
Run Code Online (Sandbox Code Playgroud)

但即使我在聚合之前按日期对输入数据框进行排序,collect_list也不保证顺序.

有人可以通过保留基于第二个(日期)变量的订单来帮助如何进行聚合吗?

python apache-spark pyspark

30
推荐指数
6
解决办法
2万
查看次数

使用Pyspark计算Spark数据帧的每列中的非NaN条目数

我有一个非常大的数据集,在Hive中加载.它由大约190万行和1450列组成.我需要确定每个列的"覆盖率",即每个列具有非NaN值的行的分数.

这是我的代码:

from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string

sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)

df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()

covgs=sc.parallelize(df.columns)
        .map(lambda x: str(x))
        .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Run Code Online (Sandbox Code Playgroud)

在pyspark shell中尝试这个,如果我然后执行covgs.take(10),它会返回一个相当大的错误堆栈.它说保存文件时出现问题/usr/lib64/python2.6/pickle.py.这是错误的最后一部分:

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

如果有更好的方法来实现这一点,而不是我正在尝试的方式,我愿意接受建议.我不能使用pandas,因为它目前在我工作的集群上不可用,我无权安装它.

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
1
解决办法
3万
查看次数

将JSON文件读入Spark时出现_corrupt_record错误

我有这个JSON文件

{
    "a": 1, 
    "b": 2
}
Run Code Online (Sandbox Code Playgroud)

这是用Python json.dump方法获得的.现在,我想使用pyspark将此文件读入Spark中的DataFrame.以下文档,我正在这样做

sc = SparkContext()

sqlc = SQLContext(sc)

df = sqlc.read.json('my_file.json')

print df.show()

print语句虽然吐出了这个:

+---------------+
|_corrupt_record|
+---------------+
|              {|
|       "a": 1, |
|         "b": 2|
|              }|
+---------------+
Run Code Online (Sandbox Code Playgroud)

任何人都知道发生了什么以及为什么它没有正确解释文件?

python json dataframe pyspark

29
推荐指数
3
解决办法
2万
查看次数

PySpark:when子句中的多个条件

我想修改数据帧列(Age)的单元格值,其中当前它是空白的,我只会在另一列(Survived)的值为0时为相应的行进行修改,其中Age为空白.如果它在Survived列中为1但在Age列中为空,那么我将它保持为null.

我试图使用&&运算符,但它没有用.这是我的代码:

tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
Run Code Online (Sandbox Code Playgroud)

任何建议如何处理?谢谢.

错误信息:

SyntaxError: invalid syntax
  File "<ipython-input-33-3e691784411c>", line 1
    tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
                                                    ^
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
2
解决办法
8万
查看次数

PySpark:withColumn()有两个条件和三个结果

我正在使用Spark和PySpark.我试图实现相当于以下伪代码的结果:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)
Run Code Online (Sandbox Code Playgroud)

我试图在PySpark中这样做,但我不确定语法.有什么指针吗?我调查expr()但无法让它工作.

请注意,这df是一个pyspark.sql.dataframe.DataFrame.

hive hiveql apache-spark apache-spark-sql pyspark

29
推荐指数
3
解决办法
9万
查看次数

通过以字符串格式减去两个日期时间列来计算持续时间

我有一个Spark Dataframe,其中包含一系列日期:

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
                                    ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
                                    ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
                                    ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
                                    ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想做的是duration通过减去EndDateTime和找到StartDateTime.我想我会尝试使用函数执行此操作:

# Function to calculate time delta
def time_delta(y,x): 
    end = pd.to_datetime(y)
    start = pd.to_datetime(x)
    delta = (end-start)
    return delta

# create new RDD and add new column 'Duration' by applying …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
5万
查看次数

查看Spark Dataframe列的内容

我正在使用Spark 1.3.1.

我试图在Python中查看Spark数据帧列的值.使用Spark数据帧,我可以df.collect()查看数据帧的内容,但是我没有看到Spark数据帧列的最佳方法.

例如,数据框df包含一个名为的列'zip_code'.所以我可以做df['zip_code'],它变成了一个pyspark.sql.dataframe.Column类型,但我找不到一种方法来查看值df['zip_code'].

python dataframe apache-spark pyspark

28
推荐指数
3
解决办法
5万
查看次数

pyspark数据帧过滤器或基于列表包含

我正在尝试使用列表过滤pyspark中的数据帧.我想要根据列表进行过滤,或者仅包含列表中具有值的记录.我的代码不起作用:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
Run Code Online (Sandbox Code Playgroud)

给出以下错误:ValueError:无法将列转换为bool:请使用'&'代表'和','|' 对于'或','〜'表示构建DataFrame布尔表达式时的'not'.

filter apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
4万
查看次数