ortools中修正的总线调度问题

laz*_*aza 8 python schedule or-tools

我想从 ortools修改总线调度问题,以便每个司机的班次在插槽方面是连续的,如果需要,司机可以同时共享一个班次。

例如,假设我们有以下半小时班次(格式类似于来自 ortools 的原始 bus_scheduling_problem):

shifts = [
[0, '07:00', '07:30', 420, 450, 30],
[1, '07:30', '08:00', 450, 480, 30],
[2, '08:00', '08:30', 480, 510, 30],
[3, '08:30', '09:00', 510, 540, 30],
[4, '09:00', '09:30', 540, 570, 30],
[5, '09:30', '10:00', 570, 600, 30],
[6, '10:00', '10:30', 600, 630, 30],
[7, '10:30', '11:00', 630, 660, 30],
[8, '11:00', '11:30', 660, 690, 30],
[9, '11:30', '12:00', 690, 720, 30],
[10, '12:00', '12:30', 720, 750, 30],
[11, '12:30', '13:00', 750, 780, 30],
[12, '13:00', '13:30', 780, 810, 30],
[13, '13:30', '14:00', 810, 840, 30],
[14, '14:00', '14:30', 840, 870, 30],
[15, '14:30', '15:00', 870, 900, 30],
[16, '15:00', '15:30', 900, 930, 30],
[17, '15:30', '16:00', 930, 960, 30],
[18, '16:00', '16:30', 960, 990, 30],
[19, '16:30', '17:00', 990, 1020, 30],
[20, '17:00', '17:30', 1020, 1050, 30],
[21, '17:30', '18:00', 1050, 1080, 30],
[22, '18:00', '18:30', 1080, 1110, 30],
[23, '18:30', '19:00', 1110, 1140, 30],
[24, '19:00', '19:30', 1140, 1170, 30],
[25, '19:30', '20:00', 1170, 1200, 30],
[26, '20:00', '20:30', 1200, 1230, 30],
[27, '20:30', '21:00', 1230, 1260, 30],
[28, '21:00', '21:30', 1260, 1290, 30],
[29, '21:30', '22:00', 1290, 1320, 30],
[30, '22:00', '22:30', 1320, 1350, 30],
[31, '22:30', '23:00', 1350, 1380, 30],
[32, '23:00', '23:30', 1380, 1410, 30],
[33, '23:30', '24:00', 1410, 1440, 30]
]
Run Code Online (Sandbox Code Playgroud)

我成功地执行了这个版本的 bus_scheduling 代码,我发现我需要 2 个驱动程序来满足上述时间表的需求。工作时间范围从07:00 am to 24:00 (midnight).

因此,如果我们有 2 名巴士司机来安排这个时间表,我更愿意根据 12 小时司机轮班来涵盖每日值班的分配如下:

Driver 1: 07:00 - 19:00 with a break at 13:00
Driver 2: 12:00 - 24:00 with a break at 14:00 (basically no overlap with Driver 1's break)
Run Code Online (Sandbox Code Playgroud)

我所说的连续小时的意思是,以 的形式满足12 小时驾驶员轮班解决方案的解决方案07:00-11:00 + 14:00-15:00 + 17:00-24:00应该被接受。更多的司机解决方案还应该确保符不重叠,从而不会所有的司机都在休息。此外,由于工作量大,休息槽可能会被堵塞。

我在or-tools 讨论中得到了一个答案,说我需要在每个节点上维护自轮班开始以来的总时间,但是我在编码时遇到了困难,假设它解决了我的问题。

Ana*_*lii 3

对我来说,ortools 的总线调度问题对您的任务来说是一个过大的问题,因为您提到轮班持续时间总是30几分钟,并且不需要设置/清理时间。此外,司机必须严格11按时工作并有连续的休息时间。相反,我们可以编写一个类似于护士调度问题的脚本,这可能更容易理解(对我来说,这是第一次使用or-tools编写一些东西,而且很清楚)。

准备

首先,总班次可以计算如下:

num_shifts = len(shifts)
Run Code Online (Sandbox Code Playgroud)

所需司机人数:

num_drivers = ceil(float(num_shifts) / working_time)
Run Code Online (Sandbox Code Playgroud)

在您的情况下,司机必须准确驾驶几个11小时,因此需要22轮班(每个班次固定为30分钟):

working_time = 22
Run Code Online (Sandbox Code Playgroud)

休息时间是1这样的:

break_time = 2
Run Code Online (Sandbox Code Playgroud)

4正如您在评论中提到的,每个司机在驾驶数小时后必须休息,但不得晚于8下班后:

break_interval = [8, 16]
Run Code Online (Sandbox Code Playgroud)

司机可以开始工作的最新班次:

latest_start_shift = num_shifts - working_time - break_time
Run Code Online (Sandbox Code Playgroud)

确实,如果他/她晚点开始工作,那么司机就不会完成完整的工作时间。

搭建模型

让我们为司机定义一个轮班数组:

driver_shifts = {}
for driver_id in range(num_drivers):
    for shift_id in range(num_shifts):
        driver_shifts[(driver_id, shift_id)] = model.NewBoolVar('driver%ishift%i' % (driver_id, shift_id))
Run Code Online (Sandbox Code Playgroud)

driver_shifts[(d, s)]1如果将 shifts分配给 driver则等于d0否则。

此外,为驾驶员创建一系列开始轮班:

start_time = {}
for driver_id in range(num_drivers):
    for shift_id in range(latest_start_shift + 1):
        start_time[(driver_id, shift_id)] = model.NewBoolVar('driver%istart%i' % (driver_id, shift_id))
Run Code Online (Sandbox Code Playgroud)

start_time[(d, s)]1如果司机d在 班次 开始一个工作日s,则等于0,否则。

司机每天正好开车 11 个小时

每位驾驶员必须在一天内严格按照规定的驾驶时间行驶:

for driver_id in range(num_drivers):
    model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in range(num_shifts)) == working_time)
Run Code Online (Sandbox Code Playgroud)

然而,这还不够,因为驾驶员必须连续进行,中间要有一次休息。稍后我们将看到如何执行此操作。

所有轮班均由司机负责

每个班次必须由至少一名驾驶司机负责:

for shift_id in range(num_shifts):
    model.Add(sum(driver_shifts[(driver_id, shift_id)] for driver_id in range(num_drivers)) >= 1)
Run Code Online (Sandbox Code Playgroud)

司机连续行驶

这里就start_time发挥作用了。基本思想是,对于驾驶员的每个可能的开始时间,我们强制驾驶员在该时间下班(实际上,驾驶员每天只能开始工作一次!)。

因此,司机每天只能开始工作一次:

for driver_id in range(num_drivers):
    model.Add(sum(start_time[(driver_id, start_shift_id)] for start_shift_id in range(latest_start_shift + 1)) == 1)
Run Code Online (Sandbox Code Playgroud)

对于驱动器的每次启动时间,连续内的工作时间working_time + break_timeworking_time

for driver_id in range(num_drivers):
     for start_shift_id in range(latest_start_shift + 1):
         model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in
                   range(start_shift_id, start_shift_id + working_time + break_time)) == working_time) \
             .OnlyEnforceIf(start_time[(driver_id, start_shift_id)]) 
Run Code Online (Sandbox Code Playgroud)

中断是连续的

为此,我们需要一个额外的数组来表示具有给定工作班次开始的给break_ind[(d, s, b)]定驾驶员是否在班次休息。因此,在这种情况下,值应该是休息时间的值:dsbdriver_shifts0

     l = start_shift_id + break_interval[0]
     r = start_shift_id + break_interval[1]
     for s in range(l, r):
        break_ind[(driver_id, start_shift_id, s)] = model.NewBoolVar("d%is%is%i"%(driver_id, start_shift_id, s))
        model.Add(sum(driver_shifts[(driver_id, s1)] for s1 in range(s, s + break_time)) == 0)\
        .OnlyEnforceIf(start_time[(driver_id, start_shift_id)])\
        .OnlyEnforceIf(break_ind[(driver_id, start_shift_id, s)]) 
Run Code Online (Sandbox Code Playgroud)

此外,驾驶员每天只能休息一次:

model.Add(sum(break_ind[(driver_id, start_shift_id, s)] for s in range(l, r)) == 1)
Run Code Online (Sandbox Code Playgroud)

完整代码

您可以在下面或此处查看完整的代码(我添加了它以供将来参考)。在那里,您还可以找到针对驾驶员不休息情况的简化版本。

from ortools.sat.python import cp_model
from math import ceil

shifts = [
    [0, '07:00', '07:30', 420, 450, 30],
    [1, '07:30', '08:00', 450, 480, 30],
    [2, '08:00', '08:30', 480, 510, 30],
    [3, '08:30', '09:00', 510, 540, 30],
    [4, '09:00', '09:30', 540, 570, 30],
    [5, '09:30', '10:00', 570, 600, 30],
    [6, '10:00', '10:30', 600, 630, 30],
    [7, '10:30', '11:00', 630, 660, 30],
    [8, '11:00', '11:30', 660, 690, 30],
    [9, '11:30', '12:00', 690, 720, 30],
    [10, '12:00', '12:30', 720, 750, 30],
    [11, '12:30', '13:00', 750, 780, 30],
    [12, '13:00', '13:30', 780, 810, 30],
    [13, '13:30', '14:00', 810, 840, 30],
    [14, '14:00', '14:30', 840, 870, 30],
    [15, '14:30', '15:00', 870, 900, 30],
    [16, '15:00', '15:30', 900, 930, 30],
    [17, '15:30', '16:00', 930, 960, 30],
    [18, '16:00', '16:30', 960, 990, 30],
    [19, '16:30', '17:00', 990, 1020, 30],
    [20, '17:00', '17:30', 1020, 1050, 30],
    [21, '17:30', '18:00', 1050, 1080, 30],
    [22, '18:00', '18:30', 1080, 1110, 30],
    [23, '18:30', '19:00', 1110, 1140, 30],
    [24, '19:00', '19:30', 1140, 1170, 30],
    [25, '19:30', '20:00', 1170, 1200, 30],
    [26, '20:00', '20:30', 1200, 1230, 30],
    [27, '20:30', '21:00', 1230, 1260, 30],
    [28, '21:00', '21:30', 1260, 1290, 30],
    [29, '21:30', '22:00', 1290, 1320, 30],
    [30, '22:00', '22:30', 1320, 1350, 30],
    [31, '22:30', '23:00', 1350, 1380, 30],
    [32, '23:00', '23:30', 1380, 1410, 30],
    [33, '23:30', '24:00', 1410, 1440, 30]
]

class VarArraySolutionPrinter(cp_model.CpSolverSolutionCallback):
    def __init__(self, driver_shifts, num_drivers, num_shifts, solutions):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.driver_shifts = driver_shifts
        self.num_drivers = num_drivers
        self.num_shifts = num_shifts
        self.solutions = solutions
        self.solution_id = 0

    def on_solution_callback(self):
        if self.solution_id in self.solutions:
            self.solution_id += 1
            print ("Solution found!")
            for driver_id in range(self.num_drivers):
                print ("*************Driver#%s*************" % driver_id)
                for shift_id in range(self.num_shifts):
                    if (self.Value(self.driver_shifts[(driver_id, shift_id)])):
                        print('Shift from %s to %s' %
                              (shifts[shift_id][1],
                               shifts[shift_id][2]))
            print()

    def solution_count(self):
        return self.solution_id

solver = cp_model.CpSolver()
model = cp_model.CpModel()

num_shifts = len(shifts)

working_time = 22
break_time = 2

# when take a break within the working time
break_interval = [8, 16]

latest_start_shift = num_shifts - working_time - break_time
num_drivers = ceil(float(num_shifts) / working_time)

# create an array of assignments of drivers
driver_shifts = {}
for driver_id in range(num_drivers):
    for shift_id in range(num_shifts):
        driver_shifts[(driver_id, shift_id)] = model.NewBoolVar('driver%ishift%i' % (driver_id, shift_id))

# driver must work exactly {working_time} shifts
for driver_id in range(num_drivers):
    model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in range(num_shifts)) == working_time)

# each shift must be covered by at least one driver
for shift_id in range(num_shifts):
    model.Add(sum(driver_shifts[(driver_id, shift_id)] for driver_id in range(num_drivers)) >= 1)

# create an array of start times for drivers
start_time = {}
for driver_id in range(num_drivers):
    for shift_id in range(latest_start_shift + 1):
        start_time[(driver_id, shift_id)] = model.NewBoolVar('driver%istart%i' % (driver_id, shift_id))

break_ind = {}
for driver_id in range(num_drivers):
     for start_shift_id in range(latest_start_shift + 1):
         model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in
                   range(start_shift_id, start_shift_id + working_time + break_time)) == working_time) \
             .OnlyEnforceIf(start_time[(driver_id, start_shift_id)])

         l = start_shift_id + break_interval[0]
         r = start_shift_id + break_interval[1]
         for s in range(l, r):
            break_ind[(driver_id, start_shift_id, s)] = model.NewBoolVar("d%is%is%i"%(driver_id, start_shift_id, s))
            model.Add(sum(driver_shifts[(driver_id, s1)] for s1 in range(s, s + break_time)) == 0)\
            .OnlyEnforceIf(start_time[(driver_id, start_shift_id)])\
            .OnlyEnforceIf(break_ind[(driver_id, start_shift_id, s)])
         model.Add(sum(break_ind[(driver_id, start_shift_id, s)] for s in range(l, r)) == 1)

for driver_id in range(num_drivers):
    model.Add(sum(start_time[(driver_id, start_shift_id)] for start_shift_id in range(latest_start_shift + 1)) == 1)

solution_printer = VarArraySolutionPrinter(driver_shifts, num_drivers, num_shifts, range(2))
status = solver.SearchForAllSolutions(model, solution_printer)
Run Code Online (Sandbox Code Playgroud)