在 pyspark 中实现递归算法以查找数据帧内的配对

Lau*_*der 7 python apache-spark apache-spark-sql pyspark

我有一个 spark 数据框 ( prof_student_df),其中列出了学生/教授对的时间戳。每个时间戳有 4 位教授和 4 位学生,每个教授-学生对都有一个“分数”(因此每个时间段有 16 行)。对于每个时间范围,我需要找到教授/学生之间的一对一配对,以最大限度地提高总分。每个教授只能在一个时间范围内与一名学生配对。

例如,这是一个时间范围内的配对/分数。

+------------+--------------+------------+-------+----------+
|    time    | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1           | s1         |   0.7 | FALSE    |
| 1596048041 | p1           | s2         |   0.5 | TRUE     |
| 1596048041 | p1           | s3         |   0.3 | FALSE    |
| 1596048041 | p1           | s4         |   0.2 | FALSE    |
| 1596048041 | p2           | s1         |   0.9 | TRUE     |
| 1596048041 | p2           | s2         |   0.1 | FALSE    |
| 1596048041 | p2           | s3         |  0.15 | FALSE    |
| 1596048041 | p2           | s4         |   0.2 | FALSE    |
| 1596048041 | p3           | s1         |   0.2 | FALSE    |
| 1596048041 | p3           | s2         |   0.3 | FALSE    |
| 1596048041 | p3           | s3         |   0.4 | FALSE    |
| 1596048041 | p3           | s4         |   0.8 | TRUE     |
| 1596048041 | p4           | s1         |   0.2 | FALSE    |
| 1596048041 | p4           | s2         |   0.3 | FALSE    |
| 1596048041 | p4           | s3         |  0.35 | TRUE     |
| 1596048041 | p4           | s4         |   0.4 | FALSE    |
+------------+--------------+------------+-------+----------+
Run Code Online (Sandbox Code Playgroud)

目标是获得这个 is_match 列。它可以是布尔值或 0/1 位或任何有效的。

在上面的例子中,p1 与 s2 匹配,p2 与 s1 匹配,p3 与 s4 匹配,p4 与 s3 匹配,因为这是使总分最大化的组合(产生 2.55 的分数)。有一种奇怪的边缘情况——在给定的时间范围内,教授或学生可能少于 4 人。如果有 4 位教授和 3 位学生,那么 1 位教授将没有配对,并且他的所有 is_match 都是假的。同样,如果有 3 个教授和 4 个学生,则 1 个学生将没有配对,并且他的所有 is_match 都是假的。

有谁知道我怎么可能做到这一点?我想我会按时间分区或分组,然后将数据输入到一些 UDF 中,该 UDF 会吐出配对,然后也许我必须将其加入到原始行中(尽管我不确定)。我正在尝试在 pyspark 中实现这个逻辑,并且可以使用 spark sql/sql 或 pyspark。

理想情况下,我希望这尽可能高效,因为会有数百万行。在问题中,我提到了递归算法,因为这是一个传统的递归类型问题,但如果有不使用递归的更快解决方案,我对此持开放态度。

非常感谢,我是火花的新手,并且对如何做到这一点感到有些困惑。

编辑:澄清我在示例中意识到的问题,我没有在一天内指定这一点,最多有 14 位教授和 14 位学生可供选择。我一次只看一天,这就是为什么我在数据框中没有日期的原因。在任何一个时间范围内,最多有 4 位教授和 4 位学生。此数据框仅显示一个时间范围。但接下来的时间框架很可能是教授4人是p5p1p7p9或类似的东西。学生可能仍然是s1, s2, s3, s4

jxc*_*jxc 5

编辑:正如评论中所讨论的,要解决您更新中提到的问题,我们可以每次使用dense_rank 将student_id 转换为广义序列id,执行步骤1 到3(使用student 列),然后使用join 将student转换为每一次回到原来的student_id数据。见下文Step-0Step-4。如果 timeUnit 中的教授少于 4 个,则 Numpy-end 中的维度将调整为 4(使用 np_vstack() 和 np_zeros()),请参阅更新的函数find_assigned

您可以尝试pandas_udfscipy.optimize.linear_sum_assignment(注意:后端方法是@cronoik在主要评论中提到的匈牙利算法),见下文:

from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np

df = spark.createDataFrame([
    ('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),
    ('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),
    ('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),
    ('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),
    ('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),
    ('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])

N = 4
cols_student = [*range(1,N+1)]
Run Code Online (Sandbox Code Playgroud)

步骤0:添加一个额外的列student,并创建的所有独特的连击一个新的数据帧DF3 time+ student_id+ student

w1 = Window.partitionBy('time').orderBy('student_id')

df = df.withColumn('student', dense_rank().over(w1))
+----------+------------+----------+-----+-------+                              
|      time|professor_id|student_id|score|student|
+----------+------------+----------+-----+-------+
|1596048041|          p1|        s1|  0.7|      1|
|1596048041|          p2|        s1|  0.9|      1|
|1596048041|          p3|        s1|  0.2|      1|
|1596048041|          p4|        s1|  0.2|      1|
|1596048041|          p1|        s2|  0.5|      2|
|1596048041|          p2|        s2|  0.1|      2|
|1596048041|          p3|        s2|  0.3|      2|
|1596048041|          p4|        s2|  0.3|      2|
|1596048041|          p1|        s3|  0.3|      3|
|1596048041|          p2|        s3| 0.15|      3|
|1596048041|          p3|        s3|  0.4|      3|
|1596048041|          p4|        s3| 0.35|      3|
|1596048041|          p1|        s4|  0.2|      4|
|1596048041|          p2|        s4|  0.2|      4|
|1596048041|          p3|        s4|  0.8|      4|
|1596048041|          p4|        s4|  0.4|      4|
+----------+------------+----------+-----+-------+

df3 = df.select('time','student_id','student').dropDuplicates()
+----------+----------+-------+                                                 
|      time|student_id|student|
+----------+----------+-------+
|1596048041|        s1|      1|
|1596048041|        s2|      2|
|1596048041|        s3|      3|
|1596048041|        s4|      4|
+----------+----------+-------+
Run Code Online (Sandbox Code Playgroud)

步骤 1:使用pivot 找到教授与学生的矩阵,注意我们将分数的负数设置为pivot 的值,以便我们可以使用 scipy.optimize.linear_sum_assignment 来找到分配问题的最小成本:

df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
+----------+------------+----+----+-----+----+
|      time|professor_id|   1|   2|    3|   4|
+----------+------------+----+----+-----+----+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|
+----------+------------+----+----+-----+----+
Run Code Online (Sandbox Code Playgroud)

步骤 2:使用 pandas_udf 和 scipy.optimize.linear_sum_assignment 获取列索引,然后将相应的列名分配给新列assigned

# returnSchema contains one more StringType column `assigned` than schema from the input pdf:
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')

# since the # of students are always N, we can use np.vstack to set the N*N matrix
# below `n` is the number of professors/rows in pdf
# sz is the size of input Matrix, sz=4 in this example
def __find_assigned(pdf, sz):
  cols = pdf.columns[2:]
  n = pdf.shape[0]
  n1 = pdf.iloc[:,2:].fillna(0).values
  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))
  return pdf.assign(assigned=[cols[i] for i in idx][:n])

find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)

df2 = df1.groupby('time').apply(find_assigned)
+----------+------------+----+----+-----+----+--------+
|      time|professor_id|   1|   2|    3|   4|assigned|
+----------+------------+----+----+-----+----+--------+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|       3|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|       1|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|       2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|       4|
+----------+------------+----+----+-----+----+--------+
Run Code Online (Sandbox Code Playgroud)

注意:根据@OluwafemiSule 的建议,我们可以使用参数maximize而不是否定分数值。此参数在SciPy 1.4.0+ 中可用:

  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)
Run Code Online (Sandbox Code Playgroud)

Step-3:使用SparkSQL堆栈函数对上述df2进行归一化,取反分数值并过滤score为NULL的行。所需的is_match列应具有assigned==student

df_new = df2.selectExpr(
  'time',
  'professor_id',
  'assigned',
  'stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))

df_new.show()
+----------+------------+--------+-------+-----+--------+
|      time|professor_id|assigned|student|score|is_match|
+----------+------------+--------+-------+-----+--------+
|1596048041|          p4|       3|      1|  0.2|   false|
|1596048041|          p4|       3|      2|  0.3|   false|
|1596048041|          p4|       3|      3| 0.35|    true|
|1596048041|          p4|       3|      4|  0.4|   false|
|1596048041|          p2|       1|      1|  0.9|    true|
|1596048041|          p2|       1|      2|  0.1|   false|
|1596048041|          p2|       1|      3| 0.15|   false|
|1596048041|          p2|       1|      4|  0.2|   false|
|1596048041|          p1|       2|      1|  0.7|   false|
|1596048041|          p1|       2|      2|  0.5|    true|
|1596048041|          p1|       2|      3|  0.3|   false|
|1596048041|          p1|       2|      4|  0.2|   false|
|1596048041|          p3|       4|      1|  0.2|   false|
|1596048041|          p3|       4|      2|  0.3|   false|
|1596048041|          p3|       4|      3|  0.4|   false|
|1596048041|          p3|       4|      4|  0.8|    true|
+----------+------------+--------+-------+-----+--------+
Run Code Online (Sandbox Code Playgroud)

第 4 步:使用 join 将 student 转换回 student_id(如果可能,使用广播 join):

df_new = df_new.join(df3, on=["time", "student"])
+----------+-------+------------+--------+-----+--------+----------+            
|      time|student|professor_id|assigned|score|is_match|student_id|
+----------+-------+------------+--------+-----+--------+----------+
|1596048041|      1|          p1|       2|  0.7|   false|        s1|
|1596048041|      2|          p1|       2|  0.5|    true|        s2|
|1596048041|      3|          p1|       2|  0.3|   false|        s3|
|1596048041|      4|          p1|       2|  0.2|   false|        s4|
|1596048041|      1|          p2|       1|  0.9|    true|        s1|
|1596048041|      2|          p2|       1|  0.1|   false|        s2|
|1596048041|      3|          p2|       1| 0.15|   false|        s3|
|1596048041|      4|          p2|       1|  0.2|   false|        s4|
|1596048041|      1|          p3|       4|  0.2|   false|        s1|
|1596048041|      2|          p3|       4|  0.3|   false|        s2|
|1596048041|      3|          p3|       4|  0.4|   false|        s3|
|1596048041|      4|          p3|       4|  0.8|    true|        s4|
|1596048041|      1|          p4|       3|  0.2|   false|        s1|
|1596048041|      2|          p4|       3|  0.3|   false|        s2|
|1596048041|      3|          p4|       3| 0.35|    true|        s3|
|1596048041|      4|          p4|       3|  0.4|   false|        s4|
+----------+-------+------------+--------+-----+--------+----------+

df_new = df_new.drop("student", "assigned")
Run Code Online (Sandbox Code Playgroud)