相关疑难解决方法(0)

如何选择每组的第一行?

我有一个DataFrame生成如下:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value") as "TotalValue")
  .sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)

如您所见,DataFrame按Hour递增顺序排序,然后按TotalValue降序排序.

我想选择每组的顶行,即

  • 来自小时组== 0选择(0,cat26,30.9)
  • 来自小时组== 1选择(1,cat67,28.5)
  • 来自小时组== …

sql scala dataframe apache-spark apache-spark-sql

122
推荐指数
3
解决办法
8万
查看次数

GroupBy在Pyspark中具有最大值的列和过滤器行

我几乎肯定以前曾经问过这个问题,但是通过stackoverflow搜索并没有回答我的问题.不是[2]的重复,因为我想要最大值,而不是最频繁的项目.我是pyspark的新手,并且尝试做一些非常简单的事情:我想将groupBy列"A"分组,然后只保留每个组中具有"B"列中最大值的行.像这样:

df_cleaned = df.groupBy("A").agg(F.max("B"))
Run Code Online (Sandbox Code Playgroud)

不幸的是,这会抛弃所有其他列 - df_cleaned只包含列"A"和B的最大值.我如何保留行?("A","B","C"......)

python apache-spark apache-spark-sql pyspark

19
推荐指数
3
解决办法
2万
查看次数

spark:如何在保持最高时间戳行的同时对数据帧执行dropDuplicates

我有一个用例,我需要删除数据帧的重复行(在这种情况下,重复意味着它们具有相同的'id'字段),同时保持具有最高'timestamp'(unix timestamp)字段的行.

我找到了drop_duplicate方法(我正在使用pyspark),但是没有人控制将保留哪个项目.

有人可以帮忙吗?Thx提前

dataframe apache-spark pyspark spark-dataframe

8
推荐指数
2
解决办法
8951
查看次数

如何在Spark中对GroupedData进行自定义操作?

我想重写一些用RDD编写的代码来使用DataFrames.在我找到这个之前,它工作得非常顺利:

 events
  .keyBy(row => (row.getServiceId + row.getClientCreateTimestamp + row.getClientId, row) )
  .reduceByKey((e1, e2) => if(e1.getClientSendTimestamp <= e2.getClientSendTimestamp) e1 else e2)
  .values
Run Code Online (Sandbox Code Playgroud)

它很简单

 events
  .groupBy(events("service_id"), events("client_create_timestamp"), events("client_id"))
Run Code Online (Sandbox Code Playgroud)

但下一步是什么?如果我想迭代当前组中的每个元素怎么办?它甚至可能吗?提前致谢.

grouping scala apache-spark

6
推荐指数
1
解决办法
3267
查看次数

(py)Spark中分组数据的模式

我有一个包含多列的spark DataFrame.我想基于一列对行进行分组,然后为每个组找到第二列的模式.使用pandas DataFrame,我会做这样的事情:

rand_values = np.random.randint(max_value,
                                size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')

print(rand_values)
##    x  y
## 0  0  0
## 1  0  4
## 2  0  1
## 3  1  1
## 4  1  2

def mode(series):
    return scipy.stats.mode(series['y'])[0][0]

rand_values.groupby('x').apply(mode)
## x
## 0    4
## 1    1
## dtype: int64
Run Code Online (Sandbox Code Playgroud)

在pyspark中,我能够找到单列的模式

df = sql_context.createDataFrame(rand_values)

def mode_spark(df, column):
    # Group by column and count the number of occurrences
    # of …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

6
推荐指数
1
解决办法
1万
查看次数

保留 Spark 结构化流中给定键的最后一行

与 Kafka 的日志压缩类似,有很多用例需要仅保留给定键的最后更新,并将结果用于例如连接数据。

如何将其存档在 spark 结构化流中(最好使用 PySpark)?

例如假设我有桌子

key    | time   | value
----------------------------
A      | 1      | foo
B      | 2      | foobar
A      | 2      | bar
A      | 15     | foobeedoo
Run Code Online (Sandbox Code Playgroud)

现在我想保留每个键的最后一个值作为状态(带水印),即有权访问数据帧

key    | time   | value
----------------------------
B      | 2      | foobar
A      | 15     | foobeedoo
Run Code Online (Sandbox Code Playgroud)

我可能想加入另一个流。

优选地,这应该在不浪费一个支持的聚合步骤的情况下完成。我想我需要一种dropDuplicates()反向顺序的函数。

请注意,这个问题是关于结构化流媒体以及如何在不浪费聚合步骤的构造的情况下解决问题的(因此,所有带有窗口函数或最大聚合的东西都不是一个好的答案)。(如果您不知道:结构化流媒体现在不支持链式聚合。)

apache-spark pyspark spark-structured-streaming

5
推荐指数
1
解决办法
1103
查看次数

通过PySpark中的几列从groupby获取具有最大值的行

我有一个类似于的数据框

from pyspark.sql.functions import avg, first

rdd = sc.parallelize(
[
(0, "A", 223,"201603", "PORT"), 
(0, "A", 22,"201602", "PORT"), 
(0, "A", 22,"201603", "PORT"), 
(0, "C", 22,"201605", "PORT"), 
(0, "D", 422,"201601", "DOCK"), 
(0, "D", 422,"201602", "DOCK"), 
(0, "C", 422,"201602", "DOCK"), 
(1,"B", 3213,"201602", "DOCK"), 
(1,"A", 3213,"201602", "DOCK"), 
(1,"C", 3213,"201602", "PORT"), 
(1,"B", 3213,"201601", "PORT"), 
(1,"B", 3213,"201611", "PORT"), 
(1,"B", 3213,"201604", "PORT"), 
(3,"D", 3999,"201601", "PORT"), 
(3,"C", 323,"201602", "PORT"), 
(3,"C", 323,"201602", "PORT"), 
(3,"C", 323,"201605", "DOCK"), 
(3,"A", 323,"201602", "DOCK"), 
(2,"C", 2321,"201601", "DOCK"),
(2,"A", 2321,"201602", "PORT")
]
)
df_data …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

3
推荐指数
1
解决办法
3701
查看次数

如何应用groupby条件并获取结果中的所有列?

我的数据框看起来像

+-------------------------+-----+
| Title| Status|Suite|ID  |Time |
+------+-------+-----+----+-----+
|KIM   | Passed|ABC  |123 |20   |
|KJT   | Passed|ABC  |123 |10   |
|ZXD   | Passed|CDF  |123 |15   |
|XCV   | Passed|GHY  |113 |36   |
|KJM   | Passed|RTH  |456 |45   |
|KIM   | Passed|ABC  |115 |47   |
|JY    | Passed|JHJK |8963|74   |
|KJH   | Passed|SNMP |256 |47   |
|KJH   | Passed|ABC  |123 |78   |
|LOK   | Passed|GHY  |456 |96   |
|LIM   | Passed|RTH  |113 |78   |
|MKN   | Passed|ABC  |115 |74   |
|KJM …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

2
推荐指数
1
解决办法
4556
查看次数

在pyspark中链接多个groupBy

我的数据如下所示:

   id | duration | action1 | action2 | ...
 ---------------------------------------------
    1 | 10       |   A     |   D
    1 | 10       |   B     |   E 
    2 | 25       |   A     |   E
    1 | 7        |   A     |   G
Run Code Online (Sandbox Code Playgroud)

我想按 ID 对其进行分组(效果很好!):

df.rdd.groupBy(lambda x: x['id']).mapValues(list).collect()
Run Code Online (Sandbox Code Playgroud)

现在我想按持续时间对每个组内的值进行分组以获得如下所示的内容:

    [(id=1,
      ((duration=10,[(action1=A,action2=D),(action1=B,action2=E),
       (duration=7,(action1=A,action2=G)),

     (id=2,
       ((duration=25,(action1=A,action2=E)))]
Run Code Online (Sandbox Code Playgroud)

这是我不知道如何进行嵌套组的地方。有小费吗?

python rdd pyspark

2
推荐指数
1
解决办法
4977
查看次数

基于其他列pyspark删除重复记录

我有一个data framepyspark像下面。

df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  1|   N|
|  2|   Y|
|  3|   N|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我想在有重复记录时删除记录id并且testN

现在当我查询 new_df

new_df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  2|   Y|
|  3|   N|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我无法弄清楚用例。

我已经完成了 groupbyid计数,但它只给出了id列和count.

我做了如下。

grouped_df = new_df.groupBy("id").count()
Run Code Online (Sandbox Code Playgroud)

我怎样才能达到我想要的结果

编辑

我有一个如下所示的数据框。

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone| …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

0
推荐指数
1
解决办法
1003
查看次数