获取Spark数据帧列中最大值的最佳方法

xen*_*yon 51 python apache-spark apache-spark-sql pyspark

我正在试图找出在Spark数据帧列中获得最大值的最佳方法.

请考虑以下示例:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Run Code Online (Sandbox Code Playgroud)

这创造了:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我的目标是找到A列中的最大值(通过检查,这是3.0).使用PySpark,我可以想到以下四种方法:

# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])

# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']

# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']

# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Run Code Online (Sandbox Code Playgroud)

上面的每一个都给出了正确的答案,但在没有Spark分析工具的情况下,我无法分辨哪个是最好的.

任何关于上述哪种方法在Spark运行时或资源使用方面最有效的直觉或经验主义的想法,或者是否有比上述方法更直接的方法?

Bur*_*urt 50

>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor|           timestamp|     uid|         x|          y|
+-----+--------------------+--------+----------+-----------+
|    1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
|    1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
|    1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
|    1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|

>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
Run Code Online (Sandbox Code Playgroud)

答案与方法3几乎相同.但似乎可以删除method3中的"asDict()"

  • 虽然 `.collect()[0]` 有效,但使用 `.first()[0]` 可能更安全。根据定义,[collect()](https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd)将“返回数据集的所有元素作为驱动程序中的数组。”,**这是一台机器**。如果语法错误,您最终可能会使用过多的内存。 (7认同)
  • @jibiel`collection()`返回一个列表(在本例中为单个项),因此您需要访问列表中的第一个(唯一)项 (2认同)
  • 如果`collect()[0]`,则可以使用@Burt `head()`。 (2认同)

小智 19

可以使用以下方法实现数据帧特定列的最大值:

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

  • 与已接受的解决方案相比,我更喜欢您的解决方案。添加两个“[0]”仅给出结果 (2认同)

Dan*_*kyy 12

备注:Spark旨在用于大数据 - 分布式计算.示例DataFrame的大小非常小,因此可以根据小的示例更改现实示例的顺序.

最慢:Method_1,因为.describe("A")计算min,max,mean,stddev和count(整个列上的5次计算)

中:Method_4,因为.rdd(DF到RDD转换)会减慢进程.

更快:Method_3~S method_2~method_5,因为逻辑非常相似,所以Spark的催化剂优化器遵循非常相似的逻辑,操作次数最少(获取特定列的最大值,收集单值数据帧); (.asDict()增加了一点时间比较3,2到5)

import pandas as pd
import time

time_dict = {}

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#--  For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)

tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)

tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)

tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)

tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)

tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)

print time_dict
Run Code Online (Sandbox Code Playgroud)

以毫秒(ms)为单位的集群边缘节点上的结果:

小DF(ms):{'m1':7096,'m2':205,'m3':165,'m4':211,'m5':180}

更大的DF(ms):{'m1':10260,'m2':452,'m3':465,'m4':916,'m5':373}


lum*_*men 11

另一种方式:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
Run Code Online (Sandbox Code Playgroud)

根据我的数据,我得到了这个基准:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s

df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s

df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
Run Code Online (Sandbox Code Playgroud)

所有人都给出了相同的答案

  • “df.limit(1).collect()[0]”可以替换为“df.first()” (3认同)

Nan*_*esh 11

下面的示例显示了如何获取 Spark 数据帧列中的最大值。

from pyspark.sql.functions import max

df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
|   3.0|
+------+

print result.collect()[0]['max(A)']
3.0
Run Code Online (Sandbox Code Playgroud)

类似地,可以按如下所示计算 min、mean 等:

from pyspark.sql.functions import mean, min, max

result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
|   2.0|   1.0|   3.0|
+------+------+------+
Run Code Online (Sandbox Code Playgroud)


小智 8

首先添加导入行:

from pyspark.sql.functions import min, max

要在数据框中找到年龄的最小值:

df.agg(min("age")).show()

+--------+
|min(age)|
+--------+
|      29|
+--------+
Run Code Online (Sandbox Code Playgroud)

要在数据框中找到年龄的最大值:

df.agg(max("age")).show()

+--------+
|max(age)|
+--------+
|      77|
+--------+
Run Code Online (Sandbox Code Playgroud)


pro*_*ray 6

我使用了这个链中已经存在的另一个解决方案(由@satprem Rath)。

要在数据框中找到年龄的最小值:

df.agg(min("age")).show()

+--------+
|min(age)|
+--------+
|      29|
+--------+
Run Code Online (Sandbox Code Playgroud)

编辑:添加更多上下文。

虽然上述方法打印了结果,但我在将结果分配给变量以供以后重用时遇到了问题。

因此,要仅获取int分配给变量的值:

from pyspark.sql.functions import max, min  

maxValueA = df.agg(max("A")).collect()[0][0]
maxValueB = df.agg(max("B")).collect()[0][0]
Run Code Online (Sandbox Code Playgroud)

  • 请围绕您的解决方案添加一些上下文和解释。 (4认同)

Blu*_*uds 5

要获得价值,请使用其中任何一个

  1. df1.agg({"x": "max"}).collect()[0][0]
  2. df1.agg({"x": "max"}).head()[0]
  3. df1.agg({"x": "max"}).first()[0]

或者我们可以做这些“分钟”

from pyspark.sql.functions import min, max
df1.agg(min("id")).collect()[0][0]
df1.agg(min("id")).head()[0]
df1.agg(min("id")).first()[0]
Run Code Online (Sandbox Code Playgroud)