如何比较多行?

qui*_*hts 5 scala apache-spark spark-streaming apache-spark-sql

我想连续两排比较ii-1col2(排序col1).

如果item_i在的i行且第item_[i-1]_row是不同的,我想增加的计数item_[i-1]1.

+--------------+
| col1 col2    |
+--------------+
| row_1 item_1 |
| row_2 item_1 |
| row_3 item_2 |
| row_4 item_1 |
| row_5 item_2 |
| row_6 item_1 |
+--------------+
Run Code Online (Sandbox Code Playgroud)

在上面的示例中,如果我们一次向下扫描两行,我们会看到row_2并且row_3不同,因此我们在item_1中添加了一行.接下来,我们看到row_3不同于row_4,然后添加一个item_2.继续,直到我们结束:

+-------------+
|  col2  col3 |
+-------------+
|  item_1  2  |
|  item_2  2  |
+-------------+
Run Code Online (Sandbox Code Playgroud)

小智 8

您可以使用窗口函数和聚合的组合来执行此操作.window函数用于获取下一个值col2(col1用于排序).然后聚合计算我们遇到差异的时间.这在以下代码中实现:

val data = Seq(
  ("row_1", "item_1"),
  ("row_2", "item_1"),
  ("row_3", "item_2"),
  ("row_4", "item_1"),
  ("row_5", "item_2"),
  ("row_6", "item_1")).toDF("col1", "col2")

import org.apache.spark.sql.expressions.Window
val q = data.
  withColumn("col2_next",
    coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")).
  groupBy($"col2").
  agg(sum($"col2" =!= $"col2_next" cast "int") as "col3")

scala> q.show
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+----+
|  col2|col3|
+------+----+
|item_1|   2|
|item_2|   2|
+------+----+
Run Code Online (Sandbox Code Playgroud)