Lau*_*der 11 python hungarian-algorithm apache-spark pyspark scipy-optimize
我有一个分配问题,我想向 SO 社区询问为我的 Spark 数据帧实现此任务的最佳方法(使用 Spark 3.1+)。我将首先描述问题,然后再进行实施。
问题是:我最多有 N 个任务和 N 个个人(在这个问题的情况下,N=10)。每个人执行每项任务都有一定的成本,其中最小成本为 0 美元,最大成本为 10 美元。这是一种匈牙利算法问题,有一些注意事项。
multiTask=True(不能超过 1 个multiTask,也可能没有)。如果一个worker的成本低于x多任务,他会被自动分配给多任务,并且在优化过程中该多任务被认为已被占用。
这是 Spark 数据框的样子。注意:为了简单起见,我展示了一个示例,其中 N=3(3 个任务,3 个个人)。
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])
df = spark.createDataFrame(rdd)
Run Code Online (Sandbox Code Playgroud)
您将看到有一个日期/位置,因为我需要解决每个日期/位置分组的分配问题。我计划通过根据每个工作人员和任务的 ID 分配一个“索引”来解决这个问题,dense_rank()然后使用 pandas UDF,根据索引填充 N x N numpy 数组,然后调用该linear_sum_assignment函数。然而,由于我在多任务中提出的第二个边缘情况,我不相信这个计划会起作用。
worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")
# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1)
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)
def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
df_dict = pandas_df.to_dict('records')
# in case there are less than N rows/columns
N = max(pandas_df.shape[0], pandas_df.shape[1])
arr = np.zeros((N,N))
for row in df_dict:
# worker_idx will be the row number, task idx will be the col number
worker_idx = row.get('worker_idx')
task_idx = row.get('task_idx')
arr[worker_idx][task_idx] = row.get('cost')
rids, cids = linear_sum_assignment(n)
return_list = []
# now want to return a dataframe that says which task_idx a worker has
for r, c in zip(rids, cids):
for d in df_dict:
if d.get('worker_idx') == r:
d['task_assignment'] = c
return_list.append(d)
return pd.DataFrame(return_list)
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)
df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,这个案例根本没有涵盖多任务。我想以最有效的方式解决这个问题,这样我就不会受到 pandas udf 或 scipy 的束缚。
我对您正在使用的库一无所知,因此我无法帮助您编写代码,但我认为您应该分两步执行此操作:
基本的匈牙利算法仅适用于平方成本矩阵,看起来您已经通过用 0 填充成本矩阵来正确处理了该问题,但是对适用于矩形矩阵的算法进行了修改。您可能想看看是否可以使用这些替代方案之一,因为它可能会快得多。
| 归档时间: |
|
| 查看次数: |
897 次 |
| 最近记录: |