Amo*_*ong 4 python dataframe apache-spark apache-spark-sql pyspark
我有以下pySpark数据帧:
+------------------+------------------+--------------------+--------------+-------+
| col1| col2| col3| X| Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| null|
| 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| null|
+------------------+------------------+--------------------+--------------+-------+
Run Code Online (Sandbox Code Playgroud)
这是一个相当简单的操作,我可以很容易地用熊猫做.但是,我需要只使用pySpark.
我想做以下(我会写一些伪代码):
在col3 == max(col3)的行中,将Y从null更改为'K'
在剩下的行中,在col1 == max(col1)的行中,将Y从null更改为'Z'
在其余行中,在col1 == min(col1)的行中,将Y从null更改为"U"
在剩下的行中:将Y从null更改为"I".
因此,预期的输出是:
+------------------+------------------+--------------------+--------------+-------+
| col1| col2| col3| X| Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| K|
|0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
| 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| Z|
+------------------+------------------+--------------------+--------------+-------+
Run Code Online (Sandbox Code Playgroud)
完成后,我需要使用此表作为另一个表的查找:
+--------------------+--------+-----+------------------+--------------+------------+
| x1| x2| x3| x4| X| d|
+--------------------+--------+-----+------------------+--------------+------------+
|0057f68a-6330-42a...| 2876| 30| 5.989999771118164| 0| 20171219|
|05cc0191-4ee4-412...| 108381| 34|24.979999542236328| 3| 20171219|
|06f353af-e9d3-4d0...| 118798| 34| 0.0| 3| 20171219|
|0c69b607-112b-4f3...| 20993| 34| 0.0| 0| 20171219|
|0d1b52ba-1502-4ff...| 23817| 34| 0.0| 0| 20171219|
Run Code Online (Sandbox Code Playgroud)
我想使用第一个表作为查找在第二个表中创建一个新列.新列的值应该在第Y列在第一个表中的第二个表使用X列作为键(因此,我们查找在第一个表中对应于列X值列Y值进行查找,这些值在来自列X第二表).
UPD:我需要一个满足两个条件的稳健解决方案,例如:
+------------------+------------------+--------------------+--------------+-------+
| col1| col2| col3| X| Y|
+------------------+------------------+--------------------+--------------+-------+
| 2.608497168338446| 3.558069532647046| 6.607603368496324| 1| null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| null|
|2.1729247374294496| 3.529397129549324| 0.373034222141551| 2| null|
+------------------+------------------+--------------------+--------------+-------+
Run Code Online (Sandbox Code Playgroud)
在这种情况下,行0满足max('col3')和max('col1')条件.
所以需要发生的是:
第0行变为'K'
第3行变为'Z'(因为剩下的行(0已经'K'第3行满足max('col1')条件
第1行变为'U'
第2行变为"我"
我不能在表1中有多行,其中包含'I'.
计算聚合:
from pyspark.sql import functions as F
df = spark.createDataFrame([
(2.1729247374294496, 3.558069532647046, 6.607603368496324, 1),
(0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0),
(0.4253301781296708, 3.4566490739823483, 0.11711202266039554, 3),
(2.608497168338446, 3.529397129549324, 0.373034222141551, 2)
], ("col1", "col2", "col3", "x"))
min1, max1, max3 = df.select(F.min("col1"), F.max("col1"), F.max("col3")).first()
Run Code Online (Sandbox Code Playgroud)
添加列when
:
y = (F.when(F.col("col3") == max3, "K") # In row where col3 == max(col3), change Y from null to 'K'
.when(F.col("col1") == max1, "Z") # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
.when(F.col("col1") == min1, "U") # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
.otherwise("I")) # In the remaining row: change Y from null to 'I'
df_with_y = df.withColumn("y", y)
df_with_y.show()
# +------------------+------------------+--------------------+---+---+
# | col1| col2| col3| x| y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| K|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
# | 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| Z|
# +------------------+------------------+--------------------+---+---+
Run Code Online (Sandbox Code Playgroud)
应使用第二个表中的X列作为键,在第一个表的第Y列中查找新列的值
df_with_y.select("x", "Y").join(df2, ["x"])
Run Code Online (Sandbox Code Playgroud)
如果y
已经存在,并且您要保留非空值:
df_ = spark.createDataFrame([
(2.1729247374294496, 3.558069532647046, 6.607603368496324, 1, "G"),
(0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0, None),
(0.4253301781296708, 3.4566490739823483, 0.11711202266039554, 3, None),
(2.608497168338446, 3.529397129549324, 0.373034222141551, 2, None)
], ("col1", "col2", "col3", "x", "y"))
min1_, max1_, max3_ = df.filter(F.col("y").isNull()).select(F.min("col1"), F.max("col1"), F.max("col3")).first()
y_ = (F.when(F.col("col3") == max3_, "K")
.when(F.col("col1") == max1_, "Z")
.when(F.col("col1") == min1_, "U")
.otherwise("I"))
df_.withColumn("y", F.coalesce(F.col("y"), y_)).show()
# +------------------+------------------+--------------------+---+---+
# | col1| col2| col3| x| y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| G|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
# | 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| K|
# +------------------+------------------+--------------------+---+---+
Run Code Online (Sandbox Code Playgroud)
如果遇到数值精度问题,可以尝试:
threshold = 0.0000001 # Choose appropriate
y_t = (F.when(F.abs(F.col("col3") - max3) < threshold, "K") # In row where col3 == max(col3), change Y from null to 'K'
.when(F.abs(F.col("col1") - max1) < threshold, "Z") # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
.when(F.abs(F.col("col1") - min1) < threshold, "U") # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
.otherwise("I")) # In the remaining row: change Y from null to 'I'
df.withColumn("y", y_t).show()
# +------------------+------------------+--------------------+---+---+
# | col1| col2| col3| x| y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| K|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
# | 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| Z|
# +------------------+------------------+--------------------+---+---+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1697 次 |
最近记录: |