在 PySpark 中提取特定行

Har*_*pta 1 python apache-spark apache-spark-sql pyspark

我有一个这样的数据框

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

我只想提取那些只包含一种特定类型 - “A”的行(或 ID)

因此我的预期输出将包含以下行

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|    1|
|ID4|   A|   10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

对于每个 ID 可以包含任何类型 - A、B、C 等。我想提取那些包含一个且仅包含一个类型的 ID - 'A'

我怎样才能在 PySpark 中实现这一点

rig*_*oin 8

您可以对其应用过滤器。

import pyspark.sql.functions as f

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

x= df.filter(f.col('Type')=='A')

x.show()

Run Code Online (Sandbox Code Playgroud)

如果我们需要过滤所有只有一条记录的 ID,并且类型为“A”,那么下面的代码可能是解决方案


df.registerTempTable('table1')


sqlContext.sql('select a.ID, a.Type,a.Value from table1 as a, (select ID, count(*) as cnt_val from table1 group by ID) b where a.ID = b.ID and (a.Type=="A" and b.cnt_val ==1)').show()


+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+


Run Code Online (Sandbox Code Playgroud)

会有更好的替代方法来找到相同的。

  • 我很抱歉,这不是 OP 所要求的。您只是过滤掉其中包含“A”的行。在哪里,OP 要求那些只有“A”的“ID”,“和”没有其他字母。 (2认同)

cph*_*sto 6

按照 OP 的要求,我正在记下我在评论下写的答案。

手头问题的目的是过滤掉DataFrame每个特定ID元素只有一个元素Type A而没有其他元素的地方。

# Loading the requisite packages
from pyspark.sql.functions import col, collect_set, array_contains, size, first
Run Code Online (Sandbox Code Playgroud)

我们的想法是aggregate()DataFrameID第一,由此我们组中的所有unique的元件Type使用collect_set()在阵列中。拥有unique元素很重要,因为对于特定的元素ID可能会有两行,两行都具有Typeas A。这就是为什么我们应该使用collect_set()而不是collect_list()因为后者不会返回唯一元素,而是所有元素。

然后我们应该使用first()来获取组中Type和的第一个值Value。如果 caseAunique Type特定的唯一可能ID,那么first()将返回AcaseA发生一次的唯一值,如果有 的 重复,则返回最高值A

df = df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),
                                 first(col('Value')).alias('Value'),
                                 collect_set('Type').alias('Type_Arr'))
df.show()
+---+----+-----+---------+
| ID|Type|Value| Type_Arr|
+---+----+-----+---------+
|ID2|   A|   12|      [A]|
|ID3|   A|    3|[A, B, C]|
|ID1|   A|    1|   [A, B]|
|ID4|   A|   10|      [A]|
+---+----+-----+---------+
Run Code Online (Sandbox Code Playgroud)

最后,我们将同时放置 2 个条件来过滤出所需的数据集。

条件 1:它检查usingA数组中是否存在。Typearray_contains()

条件 2:它检查size数组的 。如果大小大于 1,则应该有多个Types.

df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

  • 巧用先。 (2认同)