Hem*_*ant 12 python apache-spark pyspark
我正在使用n列的PySpark DataFrame.我有一组m列(m <n),我的任务是选择包含最大值的列.
例如:
输入:PySpark DataFrame包含col_1 = [1,2,3],col_2 = [2,1,4],col_3 = [3,2,5].
在该示例中,Ouput = col_4 = max(col1,col_2,col_3)= [3,2,5].
在这个问题中解释的熊猫有类似的东西.
有什么方法可以在PySpark中执行此操作,还是应该将我的PySpark df转换为Pandas df然后执行操作?
zer*_*323 19
您可以减少在列表列表中使用SQL表达式:
from pyspark.sql.functions import max as max_, col, when
from functools import reduce
def row_max(*cols):
return reduce(
lambda x, y: when(x > y, x).otherwise(y),
[col(c) if isinstance(c, str) else c for c in cols]
)
df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
.toDF(["a", "b", "c"]))
df.select(row_max("a", "b", "c").alias("max")))
Run Code Online (Sandbox Code Playgroud)
Spark 1.5+还提供least
,greatest
from pyspark.sql.functions import greatest
df.select(greatest("a", "b", "c"))
Run Code Online (Sandbox Code Playgroud)
如果你想保留最大名称你可以使用`结构:
from pyspark.sql.functions import struct, lit
def row_max_with_name(*cols):
cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))
maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))
Run Code Online (Sandbox Code Playgroud)
最后你可以使用上面的选择"顶部"列:
from pyspark.sql.functions import max
((_, c), ) = (maxs
.groupBy(col("maxs")["col"].alias("col"))
.count()
.agg(max(struct(col("count"), col("col"))))
.first())
df.select(c)
Run Code Online (Sandbox Code Playgroud)
ans*_*sev 14
我们可以用 greatest
创建数据帧
df = spark.createDataFrame(
[[1,2,3], [2,1,2], [3,4,5]],
['col_1','col_2','col_3']
)
df.show()
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
| 1| 2| 3|
| 2| 1| 2|
| 3| 4| 5|
+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
解决方案
from pyspark.sql.functions import greatest
df2 = df.withColumn('max_by_rows', greatest('col_1', 'col_2', 'col_3'))
#Only if you need col
#from pyspark.sql.functions import col
#df2 = df.withColumn('max', greatest(col('col_1'), col('col_2'), col('col_3')))
df2.show()
+-----+-----+-----+-----------+
|col_1|col_2|col_3|max_by_rows|
+-----+-----+-----+-----------+
| 1| 2| 3| 3|
| 2| 1| 2| 2|
| 3| 4| 5| 5|
+-----+-----+-----+-----------+
Run Code Online (Sandbox Code Playgroud)
您还可以使用内置的pyspark least
:
from pyspark.sql.functions import least, col
df = df.withColumn('min', least(col('c1'), col('c2'), col('c3')))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
15229 次 |
最近记录: |