我最近在 Spark 中遇到了一些奇怪的事情。据我了解,鉴于spark dfs的基于列的存储方法,列的顺序确实没有任何意义,它们就像字典中的键。
在 a 期间df.union(df2),列的顺序重要吗?我会假设它不应该,但根据 sql 论坛的智慧,它确实如此。
所以我们有 df1
df1
| a| b|
+---+----+
| 1| asd|
| 2|asda|
| 3| f1f|
+---+----+
df2
| b| a|
+----+---+
| asd| 1|
|asda| 2|
| f1f| 3|
+----+---+
result
| a| b|
+----+----+
| 1| asd|
| 2|asda|
| 3| f1f|
| asd| 1|
|asda| 2|
| f1f| 3|
+----+----+
Run Code Online (Sandbox Code Playgroud)
看起来使用了 df1 中的架构,但数据似乎已按照其原始数据帧的顺序加入。显然,解决方案是这样做df1.union(df2.select(df1.columns))
但主要问题是,它为什么要这样做?仅仅是因为它是 pyspark.sql 的一部分,还是 Spark 中有一些我在理解上搞砸了的底层数据架构?
如果有人想尝试创建测试集的代码
d1={'a':[1,2,3], 'b':['asd','asda','f1f']}
d2={ 'b':['asd','asda','f1f'], 'a':[1,2,3],}
pdf1=pd.DataFrame(d1)
pdf2=pd.DataFrame(d2) …Run Code Online (Sandbox Code Playgroud) 我试图获取数据框中的所有行,其中列值不在列表中(因此通过排除进行过滤).
举个例子:
df = sqlContext.createDataFrame([('1','a'),('2','b'),('3','b'),('4','c'),('5','d')]
,schema=('id','bar'))
Run Code Online (Sandbox Code Playgroud)
我得到了数据框:
+---+---+
| id|bar|
+---+---+
| 1| a|
| 2| b|
| 3| b|
| 4| c|
| 5| d|
+---+---+
Run Code Online (Sandbox Code Playgroud)
我只想排除bar所在的行('a'或'b').
使用SQL表达式字符串,它将是:
df.filter('bar not in ("a","b")').show()
Run Code Online (Sandbox Code Playgroud)
有没有办法在不使用SQL表达式的字符串或一次排除一个项目的情况下执行此操作?
编辑:
我可能有一个列表,['a','b'],我想要使用的排除值.
我每年使用以下代码来聚集学生.目的是了解每年的学生总数.
from pyspark.sql.functions import col
import pyspark.sql.functions as fn
gr = Df2.groupby(['Year'])
df_grouped =
gr.agg(fn.count(col('Student_ID')).alias('total_student_by_year'))
Run Code Online (Sandbox Code Playgroud)
结果是:
[学生按年份] [1]
我发现有这么多ID重复的问题所以结果是错误的和巨大的.
我希望按年份对学生进行聚集,按年计算学生总数,并将ID重复计算.
我希望这个问题很清楚.我是新成员谢谢
我是Python和Spark的初学者.创建一个DataFramefrom CSV文件后,我想知道如何修剪一个列.我试过了:
df = df.withColumn("Product", df.Product.strip())
Run Code Online (Sandbox Code Playgroud)
df是我的数据框,Product是我表中的一列
但我总是看到错误:
Column object is not callable
你有什么建议吗?
import numpy as np
df = spark.createDataFrame(
[(1, 1, None),
(1, 2, float(5)),
(1, 3, np.nan),
(1, 4, None),
(0, 5, float(10)),
(1, 6, float('nan')),
(0, 6, float('nan'))],
('session', "timestamp1", "id2"))
Run Code Online (Sandbox Code Playgroud)
+-------+----------+----+
|session|timestamp1| id2|
+-------+----------+----+
| 1| 1|null|
| 1| 2| 5.0|
| 1| 3| NaN|
| 1| 4|null|
| 0| 5|10.0|
| 1| 6| NaN|
| 0| 6| NaN|
+-------+----------+----+
Run Code Online (Sandbox Code Playgroud)
当session == 0时,如何用值999替换timestamp1列的值?
预期产出
+-------+----------+----+
|session|timestamp1| id2|
+-------+----------+----+
| 1| 1|null|
| 1| 2| 5.0|
| 1| 3| NaN| …Run Code Online (Sandbox Code Playgroud) 我想首先根据以下条件过滤数据帧(d <5),其次(如果col1中的值等于col3中的对应项,则col2的值不等于col4中的对应值).
如果原始数据帧DF如下:
+----+----+----+----+---+
|col1|col2|col3|col4| d|
+----+----+----+----+---+
| A| xx| D| vv| 4|
| C| xxx| D| vv| 10|
| A| x| A| xx| 3|
| E| xxx| B| vv| 3|
| E| xxx| F| vvv| 6|
| F|xxxx| F| vvv| 4|
| G| xxx| G| xxx| 4|
| G| xxx| G| xx| 4|
| G| xxx| G| xxx| 12|
| B|xxxx| B| xx| 13|
+----+----+----+----+---+
Run Code Online (Sandbox Code Playgroud)
所需的Dataframe是:
+----+----+----+----+---+
|col1|col2|col3|col4| d|
+----+----+----+----+---+
| A| xx| D| vv| 4|
| …Run Code Online (Sandbox Code Playgroud) 我有几百个文件夹,每个文件夹有成千上万的gzip文本文件,我试图将它们读入数据框中spark.read.csv().
在这些文件中,有些文件的长度为零,导致错误:
java.io.EOFException:输入流的意外结束
码:
df = spark.read.csv('s3n://my-bucket/folder*/logfiles*.log.gz',sep='\t',schema=schema)
Run Code Online (Sandbox Code Playgroud)
我已经尝试设置mode到DROPMALFORMED与阅读sc.textFile(),但没有运气.
处理空的或损坏的gzip文件的最佳方法是什么?
这种情况非常简单,我需要使用以下代码将python列表转换为数据框
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType
schema = StructType([StructField("value", IntegerType(), True)])
my_list = [1, 2, 3, 4]
rdd = sc.parallelize(my_list)
df = sqlContext.createDataFrame(rdd, schema)
df.show()
Run Code Online (Sandbox Code Playgroud)
它失败并出现以下错误:
raise TypeError("StructType can not accept object %r in type %s" % (obj, type(obj)))
TypeError: StructType can not accept object 1 in type <class 'int'>
Run Code Online (Sandbox Code Playgroud)
我正在使用pySpark,并设置了我的数据框,其中两列代表每日资产价格,如下所示:
ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])
Run Code Online (Sandbox Code Playgroud)
我开始申请df.show():
+---+-----+
|day|price|
+---+-----+
| 1| 33.3|
| 2| 31.1|
| 3| 51.2|
| 4| 21.3|
+---+-----+
Run Code Online (Sandbox Code Playgroud)
哪个好,一切都好.我想有另一个列,其中包含价格列的日常回报,即类似的内容
(price(day2)-price(day1))/(price(day1))
经过大量研究后,我被告知通过应用这些pyspark.sql.window功能可以最有效地完成,但我无法看到.
我有数据框中的数据如下:
datetime | userId | memberId | value |
2016-04-06 16:36:... | 1234 | 111 | 1
2016-04-06 17:35:... | 1234 | 222 | 5
2016-04-06 17:50:... | 1234 | 111 | 8
2016-04-06 18:36:... | 1234 | 222 | 9
2016-04-05 16:36:... | 4567 | 111 | 1
2016-04-06 17:35:... | 4567 | 222 | 5
2016-04-06 18:50:... | 4567 | 111 | 8
2016-04-06 19:36:... | 4567 | 222 | 9
Run Code Online (Sandbox Code Playgroud)
我需要在userid,memberid中找到max(datetime)groupby.当我尝试如下:
df2 = df.groupBy('userId','memberId').max('datetime')
Run Code Online (Sandbox Code Playgroud)
我收到的错误是:
org.apache.spark.sql.AnalysisException: "datetime" …Run Code Online (Sandbox Code Playgroud)