标签: pyspark-sql

Spark联合列顺序

我最近在 Spark 中遇到了一些奇怪的事情。据我了解,鉴于spark dfs的基于列的存储方法,列的顺序确实没有任何意义,它们就像字典中的键。

在 a 期间df.union(df2),列的顺序重要吗?我会假设它不应该,但根据 sql 论坛的智慧,它确实如此。

所以我们有 df1

df1
|  a|   b|
+---+----+
|  1| asd|
|  2|asda|
|  3| f1f|
+---+----+

df2
|   b|  a|
+----+---+
| asd|  1|
|asda|  2|
| f1f|  3|
+----+---+

result
|   a|   b|
+----+----+
|   1| asd|
|   2|asda|
|   3| f1f|
| asd|   1|
|asda|   2|
| f1f|   3|
+----+----+

Run Code Online (Sandbox Code Playgroud)

看起来使用了 df1 中的架构,但数据似乎已按照其原始数据帧的顺序加入。显然,解决方案是这样做df1.union(df2.select(df1.columns))

但主要问题是,它为什么要这样做?仅仅是因为它是 pyspark.sql 的一部分,还是 Spark 中有一些我在理解上搞砸了的底层数据架构?

如果有人想尝试创建测试集的代码

d1={'a':[1,2,3], 'b':['asd','asda','f1f']}
d2={ 'b':['asd','asda','f1f'], 'a':[1,2,3],}
pdf1=pd.DataFrame(d1)
pdf2=pd.DataFrame(d2) …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

26
推荐指数
2
解决办法
2万
查看次数

使用isin排除过滤pyspark数据帧

我试图获取数据框中的所有行,其中列值不在列表中(因此通过排除进行过滤).

举个例子:

df = sqlContext.createDataFrame([('1','a'),('2','b'),('3','b'),('4','c'),('5','d')]
,schema=('id','bar'))
Run Code Online (Sandbox Code Playgroud)

我得到了数据框:

+---+---+
| id|bar|
+---+---+
|  1|  a|
|  2|  b|
|  3|  b|
|  4|  c|
|  5|  d|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我只想排除bar所在的行('a'或'b').

使用SQL表达式字符串,它将是:

df.filter('bar not in ("a","b")').show()
Run Code Online (Sandbox Code Playgroud)

有没有办法在不使用SQL表达式的字符串或一次排除一个项目的情况下执行此操作?

编辑:

我可能有一个列表,['a','b'],我想要使用的排除值.

python apache-spark pyspark pyspark-sql

21
推荐指数
4
解决办法
7万
查看次数

如何在pyspark中的groupBy之后计算唯一ID

我每年使用以下代码来聚集学生.目的是了解每年的学生总数.

from pyspark.sql.functions import col
import pyspark.sql.functions as fn
gr = Df2.groupby(['Year'])
df_grouped = 
gr.agg(fn.count(col('Student_ID')).alias('total_student_by_year'))
Run Code Online (Sandbox Code Playgroud)

结果是:

[学生按年份] [1]

我发现有这么多ID重复的问题所以结果是错误的和巨大的.

我希望按年份对学生进行聚集,按年计算学生总数,并将ID重复计算.

我希望这个问题很清楚.我是新成员谢谢

python pyspark spark-dataframe pyspark-sql

21
推荐指数
2
解决办法
4万
查看次数

在PySpark数据帧中修剪字符串列

我是Python和Spark的初学者.创建一个DataFramefrom CSV文件后,我想知道如何修剪一个列.我试过了:

df = df.withColumn("Product", df.Product.strip())
Run Code Online (Sandbox Code Playgroud)

df是我的数据框,Product是我表中的一列

但我总是看到错误:

Column object is not callable

你有什么建议吗?

trim apache-spark apache-spark-sql pyspark pyspark-sql

19
推荐指数
5
解决办法
4万
查看次数

如何基于Pyspark中另一列的表达式评估有条件地替换列中的值?

import numpy as np

df = spark.createDataFrame(
    [(1, 1, None),
     (1, 2, float(5)),
     (1, 3, np.nan),
     (1, 4, None),
     (0, 5, float(10)),
     (1, 6, float('nan')),
     (0, 6, float('nan'))],
    ('session', "timestamp1", "id2"))
Run Code Online (Sandbox Code Playgroud)
+-------+----------+----+
|session|timestamp1| id2|
+-------+----------+----+
|      1|         1|null|
|      1|         2| 5.0|
|      1|         3| NaN|
|      1|         4|null|
|      0|         5|10.0|
|      1|         6| NaN|
|      0|         6| NaN|
+-------+----------+----+
Run Code Online (Sandbox Code Playgroud)

当session == 0时,如何用值999替换timestamp1列的值?

预期产出

+-------+----------+----+
|session|timestamp1| id2|
+-------+----------+----+
|      1|         1|null|
|      1|         2| 5.0|
|      1|         3| NaN| …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

19
推荐指数
1
解决办法
3万
查看次数

Pyspark:根据多个条件过滤数据框

我想首先根据以下条件过滤数据帧(d <5),其次(如果col1中的值等于col3中的对应项,则col2的值不等于col4中的对应值).

如果原始数据帧DF如下:

+----+----+----+----+---+
|col1|col2|col3|col4|  d|
+----+----+----+----+---+
|   A|  xx|   D|  vv|  4|
|   C| xxx|   D|  vv| 10|
|   A|   x|   A|  xx|  3|
|   E| xxx|   B|  vv|  3|
|   E| xxx|   F| vvv|  6|
|   F|xxxx|   F| vvv|  4|
|   G| xxx|   G| xxx|  4|
|   G| xxx|   G|  xx|  4|
|   G| xxx|   G| xxx| 12|
|   B|xxxx|   B|  xx| 13|
+----+----+----+----+---+
Run Code Online (Sandbox Code Playgroud)

所需的Dataframe是:

+----+----+----+----+---+
|col1|col2|col3|col4|  d|
+----+----+----+----+---+
|   A|  xx|   D|  vv|  4|
| …
Run Code Online (Sandbox Code Playgroud)

sql filter apache-spark-sql pyspark pyspark-sql

19
推荐指数
3
解决办法
6万
查看次数

Spark - 阅读时如何跳过或忽略空的gzip文件

我有几百个文件夹,每个文件夹有成千上万的gzip文本文件,我试图将它们读入数据框中spark.read.csv().

在这些文件中,有些文件的长度为零,导致错误:

java.io.EOFException:输入流的意外结束

码:

df = spark.read.csv('s3n://my-bucket/folder*/logfiles*.log.gz',sep='\t',schema=schema)
Run Code Online (Sandbox Code Playgroud)

我已经尝试设置modeDROPMALFORMED与阅读sc.textFile(),但没有运气.

处理空的或损坏的gzip文件的最佳方法是什么?

pyspark spark-dataframe pyspark-sql

18
推荐指数
1
解决办法
4386
查看次数

Pyspark将标准列表转换为数据框

这种情况非常简单,我需要使用以下代码将python列表转换为数据框

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType

schema = StructType([StructField("value", IntegerType(), True)])
my_list = [1, 2, 3, 4]
rdd = sc.parallelize(my_list)
df = sqlContext.createDataFrame(rdd, schema)

df.show()
Run Code Online (Sandbox Code Playgroud)

它失败并出现以下错误:

    raise TypeError("StructType can not accept object %r in type %s" % (obj, type(obj)))
TypeError: StructType can not accept object 1 in type <class 'int'>
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark pyspark-sql

18
推荐指数
2
解决办法
3万
查看次数

应用Window函数计算pySpark中的差异

我正在使用pySpark,并设置了我的数据框,其中两列代表每日资产价格,如下所示:

ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])
Run Code Online (Sandbox Code Playgroud)

我开始申请df.show():

+---+-----+
|day|price|
+---+-----+
|  1| 33.3|
|  2| 31.1|
|  3| 51.2|
|  4| 21.3|
+---+-----+
Run Code Online (Sandbox Code Playgroud)

哪个好,一切都好.我想有另一个列,其中包含价格列的日常回报,即类似的内容

(price(day2)-price(day1))/(price(day1))

经过大量研究后,我被告知通过应用这些pyspark.sql.window功能可以最有效地完成,但我无法看到.

window-functions pyspark spark-dataframe pyspark-sql

17
推荐指数
2
解决办法
2万
查看次数

如何使用pyspark从某些字段分组的给定数据集中获取max(date)?

我有数据框中的数据如下:

  datetime             | userId | memberId | value |    
2016-04-06 16:36:...   | 1234   | 111      | 1
2016-04-06 17:35:...   | 1234   | 222      | 5
2016-04-06 17:50:...   | 1234   | 111      | 8
2016-04-06 18:36:...   | 1234   | 222      | 9
2016-04-05 16:36:...   | 4567   | 111      | 1
2016-04-06 17:35:...   | 4567   | 222      | 5
2016-04-06 18:50:...   | 4567   | 111      | 8
2016-04-06 19:36:...   | 4567   | 222      | 9
Run Code Online (Sandbox Code Playgroud)

我需要在userid,memberid中找到max(datetime)groupby.当我尝试如下:

df2 = df.groupBy('userId','memberId').max('datetime')
Run Code Online (Sandbox Code Playgroud)

我收到的错误是:

org.apache.spark.sql.AnalysisException: "datetime" …
Run Code Online (Sandbox Code Playgroud)

sql apache-spark apache-spark-sql pyspark pyspark-sql

17
推荐指数
1
解决办法
2万
查看次数