我有一个DataFrame生成如下:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
如您所见,DataFrame按Hour递增顺序排序,然后按TotalValue降序排序.
我想选择每组的顶行,即
我几乎肯定以前曾经问过这个问题,但是通过stackoverflow搜索并没有回答我的问题.不是[2]的重复,因为我想要最大值,而不是最频繁的项目.我是pyspark的新手,并且尝试做一些非常简单的事情:我想将groupBy列"A"分组,然后只保留每个组中具有"B"列中最大值的行.像这样:
df_cleaned = df.groupBy("A").agg(F.max("B"))
Run Code Online (Sandbox Code Playgroud)
不幸的是,这会抛弃所有其他列 - df_cleaned只包含列"A"和B的最大值.我如何保留行?("A","B","C"......)
我有一个用例,我需要删除数据帧的重复行(在这种情况下,重复意味着它们具有相同的'id'字段),同时保持具有最高'timestamp'(unix timestamp)字段的行.
我找到了drop_duplicate方法(我正在使用pyspark),但是没有人控制将保留哪个项目.
有人可以帮忙吗?Thx提前
我想重写一些用RDD编写的代码来使用DataFrames.在我找到这个之前,它工作得非常顺利:
events
.keyBy(row => (row.getServiceId + row.getClientCreateTimestamp + row.getClientId, row) )
.reduceByKey((e1, e2) => if(e1.getClientSendTimestamp <= e2.getClientSendTimestamp) e1 else e2)
.values
Run Code Online (Sandbox Code Playgroud)
它很简单
events
.groupBy(events("service_id"), events("client_create_timestamp"), events("client_id"))
Run Code Online (Sandbox Code Playgroud)
但下一步是什么?如果我想迭代当前组中的每个元素怎么办?它甚至可能吗?提前致谢.
我有一个包含多列的spark DataFrame.我想基于一列对行进行分组,然后为每个组找到第二列的模式.使用pandas DataFrame,我会做这样的事情:
rand_values = np.random.randint(max_value,
size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')
print(rand_values)
## x y
## 0 0 0
## 1 0 4
## 2 0 1
## 3 1 1
## 4 1 2
def mode(series):
return scipy.stats.mode(series['y'])[0][0]
rand_values.groupby('x').apply(mode)
## x
## 0 4
## 1 1
## dtype: int64
Run Code Online (Sandbox Code Playgroud)
在pyspark中,我能够找到单列的模式
df = sql_context.createDataFrame(rand_values)
def mode_spark(df, column):
# Group by column and count the number of occurrences
# of …Run Code Online (Sandbox Code Playgroud) 与 Kafka 的日志压缩类似,有很多用例需要仅保留给定键的最后更新,并将结果用于例如连接数据。
如何将其存档在 spark 结构化流中(最好使用 PySpark)?
例如假设我有桌子
key | time | value
----------------------------
A | 1 | foo
B | 2 | foobar
A | 2 | bar
A | 15 | foobeedoo
Run Code Online (Sandbox Code Playgroud)
现在我想保留每个键的最后一个值作为状态(带水印),即有权访问数据帧
key | time | value
----------------------------
B | 2 | foobar
A | 15 | foobeedoo
Run Code Online (Sandbox Code Playgroud)
我可能想加入另一个流。
优选地,这应该在不浪费一个支持的聚合步骤的情况下完成。我想我需要一种dropDuplicates()反向顺序的函数。
请注意,这个问题是关于结构化流媒体以及如何在不浪费聚合步骤的构造的情况下解决问题的(因此,所有带有窗口函数或最大聚合的东西都不是一个好的答案)。(如果您不知道:结构化流媒体现在不支持链式聚合。)
我有一个类似于的数据框
from pyspark.sql.functions import avg, first
rdd = sc.parallelize(
[
(0, "A", 223,"201603", "PORT"),
(0, "A", 22,"201602", "PORT"),
(0, "A", 22,"201603", "PORT"),
(0, "C", 22,"201605", "PORT"),
(0, "D", 422,"201601", "DOCK"),
(0, "D", 422,"201602", "DOCK"),
(0, "C", 422,"201602", "DOCK"),
(1,"B", 3213,"201602", "DOCK"),
(1,"A", 3213,"201602", "DOCK"),
(1,"C", 3213,"201602", "PORT"),
(1,"B", 3213,"201601", "PORT"),
(1,"B", 3213,"201611", "PORT"),
(1,"B", 3213,"201604", "PORT"),
(3,"D", 3999,"201601", "PORT"),
(3,"C", 323,"201602", "PORT"),
(3,"C", 323,"201602", "PORT"),
(3,"C", 323,"201605", "DOCK"),
(3,"A", 323,"201602", "DOCK"),
(2,"C", 2321,"201601", "DOCK"),
(2,"A", 2321,"201602", "PORT")
]
)
df_data …Run Code Online (Sandbox Code Playgroud) 我的数据框看起来像
+-------------------------+-----+
| Title| Status|Suite|ID |Time |
+------+-------+-----+----+-----+
|KIM | Passed|ABC |123 |20 |
|KJT | Passed|ABC |123 |10 |
|ZXD | Passed|CDF |123 |15 |
|XCV | Passed|GHY |113 |36 |
|KJM | Passed|RTH |456 |45 |
|KIM | Passed|ABC |115 |47 |
|JY | Passed|JHJK |8963|74 |
|KJH | Passed|SNMP |256 |47 |
|KJH | Passed|ABC |123 |78 |
|LOK | Passed|GHY |456 |96 |
|LIM | Passed|RTH |113 |78 |
|MKN | Passed|ABC |115 |74 |
|KJM …Run Code Online (Sandbox Code Playgroud) 我的数据如下所示:
id | duration | action1 | action2 | ...
---------------------------------------------
1 | 10 | A | D
1 | 10 | B | E
2 | 25 | A | E
1 | 7 | A | G
Run Code Online (Sandbox Code Playgroud)
我想按 ID 对其进行分组(效果很好!):
df.rdd.groupBy(lambda x: x['id']).mapValues(list).collect()
Run Code Online (Sandbox Code Playgroud)
现在我想按持续时间对每个组内的值进行分组以获得如下所示的内容:
[(id=1,
((duration=10,[(action1=A,action2=D),(action1=B,action2=E),
(duration=7,(action1=A,action2=G)),
(id=2,
((duration=25,(action1=A,action2=E)))]
Run Code Online (Sandbox Code Playgroud)
这是我不知道如何进行嵌套组的地方。有小费吗?
我有一个data frame在pyspark像下面。
df.show()
+---+----+
| id|test|
+---+----+
| 1| Y|
| 1| N|
| 2| Y|
| 3| N|
+---+----+
Run Code Online (Sandbox Code Playgroud)
我想在有重复记录时删除记录id并且test是N
现在当我查询 new_df
new_df.show()
+---+----+
| id|test|
+---+----+
| 1| Y|
| 2| Y|
| 3| N|
+---+----+
Run Code Online (Sandbox Code Playgroud)
我无法弄清楚用例。
我已经完成了 groupbyid计数,但它只给出了id列和count.
我做了如下。
grouped_df = new_df.groupBy("id").count()
Run Code Online (Sandbox Code Playgroud)
我怎样才能达到我想要的结果
编辑
我有一个如下所示的数据框。
+-------------+--------------------+--------------------+
| sn| device| attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A| Android Phone| N|
|4MY16A5W02DE8| Android Phone| N|
|4MY16A5W02DE8| Android Phone| …Run Code Online (Sandbox Code Playgroud)