在pandas中,这可以通过column.name来完成.
但是当它的火花数据帧列如何做同样的事情?
例如,调用程序有一个spark数据帧:spark_df
>>> spark_df.columns
['admit', 'gre', 'gpa', 'rank']
Run Code Online (Sandbox Code Playgroud)
这个程序调用我的函数:my_function(spark_df ['rank'])在my_function中,我需要列的名称即'rank'
如果是pandas数据帧,我们可以在my_function中使用
>>> pandas_df['rank'].name
'rank'
Run Code Online (Sandbox Code Playgroud) 我正在编写一个用户定义的函数,它将获取除数据帧中第一个之外的所有列并进行求和(或任何其他操作).现在数据框有时可以有3列或4列或更多列.它会有所不同.
我知道我可以硬编码4个列名作为UDF传递,但在这种情况下它会有所不同所以我想知道如何完成它?
以下是第一个示例中的两个示例,我们有两列要添加,第二个示例中我们有三列要添加.
我正在尝试使用现有列集上的groupby聚合在Pyspark中创建新列表.下面提供了一个示例输入数据框:
------------------------
id | date | value
------------------------
1 |2014-01-03 | 10
1 |2014-01-04 | 5
1 |2014-01-05 | 15
1 |2014-01-06 | 20
2 |2014-02-10 | 100
2 |2014-03-11 | 500
2 |2014-04-15 | 1500
Run Code Online (Sandbox Code Playgroud)
预期的产出是:
id | value_list
------------------------
1 | [10, 5, 15, 20]
2 | [100, 500, 1500]
Run Code Online (Sandbox Code Playgroud)
列表中的值按日期排序.
我尝试使用collect_list如下:
from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
Run Code Online (Sandbox Code Playgroud)
但即使我在聚合之前按日期对输入数据框进行排序,collect_list也不保证顺序.
有人可以通过保留基于第二个(日期)变量的订单来帮助如何进行聚合吗?
我有一个非常大的数据集,在Hive中加载.它由大约190万行和1450列组成.我需要确定每个列的"覆盖率",即每个列具有非NaN值的行的分数.
这是我的代码:
from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string
sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()
covgs=sc.parallelize(df.columns)
.map(lambda x: str(x))
.map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Run Code Online (Sandbox Code Playgroud)
在pyspark shell中尝试这个,如果我然后执行covgs.take(10),它会返回一个相当大的错误堆栈.它说保存文件时出现问题/usr/lib64/python2.6/pickle.py.这是错误的最后一部分:
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
如果有更好的方法来实现这一点,而不是我正在尝试的方式,我愿意接受建议.我不能使用pandas,因为它目前在我工作的集群上不可用,我无权安装它.
我有这个JSON文件
{
"a": 1,
"b": 2
}
Run Code Online (Sandbox Code Playgroud)
这是用Python json.dump方法获得的.现在,我想使用pyspark将此文件读入Spark中的DataFrame.以下文档,我正在这样做
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.read.json('my_file.json')
print df.show()
print语句虽然吐出了这个:
+---------------+
|_corrupt_record|
+---------------+
| {|
| "a": 1, |
| "b": 2|
| }|
+---------------+
Run Code Online (Sandbox Code Playgroud)
任何人都知道发生了什么以及为什么它没有正确解释文件?
我想修改数据帧列(Age)的单元格值,其中当前它是空白的,我只会在另一列(Survived)的值为0时为相应的行进行修改,其中Age为空白.如果它在Survived列中为1但在Age列中为空,那么我将它保持为null.
我试图使用&&运算符,但它没有用.这是我的代码:
tdata.withColumn("Age", when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
Run Code Online (Sandbox Code Playgroud)
任何建议如何处理?谢谢.
错误信息:
SyntaxError: invalid syntax
File "<ipython-input-33-3e691784411c>", line 1
tdata.withColumn("Age", when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
^
Run Code Online (Sandbox Code Playgroud) 我正在使用Spark和PySpark.我试图实现相当于以下伪代码的结果:
df = df.withColumn('new_column',
IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)
Run Code Online (Sandbox Code Playgroud)
我试图在PySpark中这样做,但我不确定语法.有什么指针吗?我调查expr()但无法让它工作.
请注意,这df是一个pyspark.sql.dataframe.DataFrame.
我有一个Spark Dataframe,其中包含一系列日期:
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd
rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)
我想做的是duration通过减去EndDateTime和找到StartDateTime.我想我会尝试使用函数执行此操作:
# Function to calculate time delta
def time_delta(y,x):
end = pd.to_datetime(y)
start = pd.to_datetime(x)
delta = (end-start)
return delta
# create new RDD and add new column 'Duration' by applying …Run Code Online (Sandbox Code Playgroud) 我正在使用Spark 1.3.1.
我试图在Python中查看Spark数据帧列的值.使用Spark数据帧,我可以df.collect()查看数据帧的内容,但是我没有看到Spark数据帧列的最佳方法.
例如,数据框df包含一个名为的列'zip_code'.所以我可以做df['zip_code'],它变成了一个pyspark.sql.dataframe.Column类型,但我找不到一种方法来查看值df['zip_code'].
我正在尝试使用列表过滤pyspark中的数据帧.我想要根据列表进行过滤,或者仅包含列表中具有值的记录.我的代码不起作用:
# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])
# define a list of scores
l = [10,18,20]
# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)
# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
Run Code Online (Sandbox Code Playgroud)
给出以下错误:ValueError:无法将列转换为bool:请使用'&'代表'和','|' 对于'或','〜'表示构建DataFrame布尔表达式时的'not'.
pyspark ×10
apache-spark ×8
python ×5
dataframe ×4
filter ×1
hive ×1
hiveql ×1
json ×1
pyspark-sql ×1