Lau*_*der 7 python apache-spark apache-spark-sql pyspark
我有一个 spark 数据框 ( prof_student_df
),其中列出了学生/教授对的时间戳。每个时间戳有 4 位教授和 4 位学生,每个教授-学生对都有一个“分数”(因此每个时间段有 16 行)。对于每个时间范围,我需要找到教授/学生之间的一对一配对,以最大限度地提高总分。每个教授只能在一个时间范围内与一名学生配对。
例如,这是一个时间范围内的配对/分数。
+------------+--------------+------------+-------+----------+
| time | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1 | s1 | 0.7 | FALSE |
| 1596048041 | p1 | s2 | 0.5 | TRUE |
| 1596048041 | p1 | s3 | 0.3 | FALSE |
| 1596048041 | p1 | s4 | 0.2 | FALSE |
| 1596048041 | p2 | s1 | 0.9 | TRUE |
| 1596048041 | p2 | s2 | 0.1 | FALSE |
| 1596048041 | p2 | s3 | 0.15 | FALSE |
| 1596048041 | p2 | s4 | 0.2 | FALSE |
| 1596048041 | p3 | s1 | 0.2 | FALSE |
| 1596048041 | p3 | s2 | 0.3 | FALSE |
| 1596048041 | p3 | s3 | 0.4 | FALSE |
| 1596048041 | p3 | s4 | 0.8 | TRUE |
| 1596048041 | p4 | s1 | 0.2 | FALSE |
| 1596048041 | p4 | s2 | 0.3 | FALSE |
| 1596048041 | p4 | s3 | 0.35 | TRUE |
| 1596048041 | p4 | s4 | 0.4 | FALSE |
+------------+--------------+------------+-------+----------+
Run Code Online (Sandbox Code Playgroud)
目标是获得这个 is_match 列。它可以是布尔值或 0/1 位或任何有效的。
在上面的例子中,p1 与 s2 匹配,p2 与 s1 匹配,p3 与 s4 匹配,p4 与 s3 匹配,因为这是使总分最大化的组合(产生 2.55 的分数)。有一种奇怪的边缘情况——在给定的时间范围内,教授或学生可能少于 4 人。如果有 4 位教授和 3 位学生,那么 1 位教授将没有配对,并且他的所有 is_match 都是假的。同样,如果有 3 个教授和 4 个学生,则 1 个学生将没有配对,并且他的所有 is_match 都是假的。
有谁知道我怎么可能做到这一点?我想我会按时间分区或分组,然后将数据输入到一些 UDF 中,该 UDF 会吐出配对,然后也许我必须将其加入到原始行中(尽管我不确定)。我正在尝试在 pyspark 中实现这个逻辑,并且可以使用 spark sql/sql 或 pyspark。
理想情况下,我希望这尽可能高效,因为会有数百万行。在问题中,我提到了递归算法,因为这是一个传统的递归类型问题,但如果有不使用递归的更快解决方案,我对此持开放态度。
非常感谢,我是火花的新手,并且对如何做到这一点感到有些困惑。
编辑:澄清我在示例中意识到的问题,我没有在一天内指定这一点,最多有 14 位教授和 14 位学生可供选择。我一次只看一天,这就是为什么我在数据框中没有日期的原因。在任何一个时间范围内,最多有 4 位教授和 4 位学生。此数据框仅显示一个时间范围。但接下来的时间框架很可能是教授4人是p5
,p1
,p7
,p9
或类似的东西。学生可能仍然是s1
, s2
, s3
, s4
。
编辑:正如评论中所讨论的,要解决您更新中提到的问题,我们可以每次使用dense_rank 将student_id 转换为广义序列id,执行步骤1 到3(使用student 列),然后使用join 将student转换为每一次回到原来的student_id数据。见下文Step-0和Step-4。如果 timeUnit 中的教授少于 4 个,则 Numpy-end 中的维度将调整为 4(使用 np_vstack() 和 np_zeros()),请参阅更新的函数find_assigned
。
您可以尝试pandas_udf和scipy.optimize.linear_sum_assignment(注意:后端方法是@cronoik在主要评论中提到的匈牙利算法),见下文:
from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np
df = spark.createDataFrame([
('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),
('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),
('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),
('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),
('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),
('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])
N = 4
cols_student = [*range(1,N+1)]
Run Code Online (Sandbox Code Playgroud)
步骤0:添加一个额外的列student
,并创建的所有独特的连击一个新的数据帧DF3 time
+ student_id
+ student
。
w1 = Window.partitionBy('time').orderBy('student_id')
df = df.withColumn('student', dense_rank().over(w1))
+----------+------------+----------+-----+-------+
| time|professor_id|student_id|score|student|
+----------+------------+----------+-----+-------+
|1596048041| p1| s1| 0.7| 1|
|1596048041| p2| s1| 0.9| 1|
|1596048041| p3| s1| 0.2| 1|
|1596048041| p4| s1| 0.2| 1|
|1596048041| p1| s2| 0.5| 2|
|1596048041| p2| s2| 0.1| 2|
|1596048041| p3| s2| 0.3| 2|
|1596048041| p4| s2| 0.3| 2|
|1596048041| p1| s3| 0.3| 3|
|1596048041| p2| s3| 0.15| 3|
|1596048041| p3| s3| 0.4| 3|
|1596048041| p4| s3| 0.35| 3|
|1596048041| p1| s4| 0.2| 4|
|1596048041| p2| s4| 0.2| 4|
|1596048041| p3| s4| 0.8| 4|
|1596048041| p4| s4| 0.4| 4|
+----------+------------+----------+-----+-------+
df3 = df.select('time','student_id','student').dropDuplicates()
+----------+----------+-------+
| time|student_id|student|
+----------+----------+-------+
|1596048041| s1| 1|
|1596048041| s2| 2|
|1596048041| s3| 3|
|1596048041| s4| 4|
+----------+----------+-------+
Run Code Online (Sandbox Code Playgroud)
步骤 1:使用pivot 找到教授与学生的矩阵,注意我们将分数的负数设置为pivot 的值,以便我们可以使用 scipy.optimize.linear_sum_assignment 来找到分配问题的最小成本:
df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
+----------+------------+----+----+-----+----+
| time|professor_id| 1| 2| 3| 4|
+----------+------------+----+----+-----+----+
|1596048041| p4|-0.2|-0.3|-0.35|-0.4|
|1596048041| p2|-0.9|-0.1|-0.15|-0.2|
|1596048041| p1|-0.7|-0.5| -0.3|-0.2|
|1596048041| p3|-0.2|-0.3| -0.4|-0.8|
+----------+------------+----+----+-----+----+
Run Code Online (Sandbox Code Playgroud)
步骤 2:使用 pandas_udf 和 scipy.optimize.linear_sum_assignment 获取列索引,然后将相应的列名分配给新列assigned
:
# returnSchema contains one more StringType column `assigned` than schema from the input pdf:
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')
# since the # of students are always N, we can use np.vstack to set the N*N matrix
# below `n` is the number of professors/rows in pdf
# sz is the size of input Matrix, sz=4 in this example
def __find_assigned(pdf, sz):
cols = pdf.columns[2:]
n = pdf.shape[0]
n1 = pdf.iloc[:,2:].fillna(0).values
_, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))
return pdf.assign(assigned=[cols[i] for i in idx][:n])
find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)
df2 = df1.groupby('time').apply(find_assigned)
+----------+------------+----+----+-----+----+--------+
| time|professor_id| 1| 2| 3| 4|assigned|
+----------+------------+----+----+-----+----+--------+
|1596048041| p4|-0.2|-0.3|-0.35|-0.4| 3|
|1596048041| p2|-0.9|-0.1|-0.15|-0.2| 1|
|1596048041| p1|-0.7|-0.5| -0.3|-0.2| 2|
|1596048041| p3|-0.2|-0.3| -0.4|-0.8| 4|
+----------+------------+----+----+-----+----+--------+
Run Code Online (Sandbox Code Playgroud)
注意:根据@OluwafemiSule 的建议,我们可以使用参数maximize
而不是否定分数值。此参数在SciPy 1.4.0+ 中可用:
_, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)
Run Code Online (Sandbox Code Playgroud)
Step-3:使用SparkSQL堆栈函数对上述df2进行归一化,取反分数值并过滤score为NULL的行。所需的is_match
列应具有assigned==student
:
df_new = df2.selectExpr(
'time',
'professor_id',
'assigned',
'stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))
df_new.show()
+----------+------------+--------+-------+-----+--------+
| time|professor_id|assigned|student|score|is_match|
+----------+------------+--------+-------+-----+--------+
|1596048041| p4| 3| 1| 0.2| false|
|1596048041| p4| 3| 2| 0.3| false|
|1596048041| p4| 3| 3| 0.35| true|
|1596048041| p4| 3| 4| 0.4| false|
|1596048041| p2| 1| 1| 0.9| true|
|1596048041| p2| 1| 2| 0.1| false|
|1596048041| p2| 1| 3| 0.15| false|
|1596048041| p2| 1| 4| 0.2| false|
|1596048041| p1| 2| 1| 0.7| false|
|1596048041| p1| 2| 2| 0.5| true|
|1596048041| p1| 2| 3| 0.3| false|
|1596048041| p1| 2| 4| 0.2| false|
|1596048041| p3| 4| 1| 0.2| false|
|1596048041| p3| 4| 2| 0.3| false|
|1596048041| p3| 4| 3| 0.4| false|
|1596048041| p3| 4| 4| 0.8| true|
+----------+------------+--------+-------+-----+--------+
Run Code Online (Sandbox Code Playgroud)
第 4 步:使用 join 将 student 转换回 student_id(如果可能,使用广播 join):
df_new = df_new.join(df3, on=["time", "student"])
+----------+-------+------------+--------+-----+--------+----------+
| time|student|professor_id|assigned|score|is_match|student_id|
+----------+-------+------------+--------+-----+--------+----------+
|1596048041| 1| p1| 2| 0.7| false| s1|
|1596048041| 2| p1| 2| 0.5| true| s2|
|1596048041| 3| p1| 2| 0.3| false| s3|
|1596048041| 4| p1| 2| 0.2| false| s4|
|1596048041| 1| p2| 1| 0.9| true| s1|
|1596048041| 2| p2| 1| 0.1| false| s2|
|1596048041| 3| p2| 1| 0.15| false| s3|
|1596048041| 4| p2| 1| 0.2| false| s4|
|1596048041| 1| p3| 4| 0.2| false| s1|
|1596048041| 2| p3| 4| 0.3| false| s2|
|1596048041| 3| p3| 4| 0.4| false| s3|
|1596048041| 4| p3| 4| 0.8| true| s4|
|1596048041| 1| p4| 3| 0.2| false| s1|
|1596048041| 2| p4| 3| 0.3| false| s2|
|1596048041| 3| p4| 3| 0.35| true| s3|
|1596048041| 4| p4| 3| 0.4| false| s4|
+----------+-------+------------+--------+-----+--------+----------+
df_new = df_new.drop("student", "assigned")
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
731 次 |
最近记录: |