工人/时隙排列/约束过滤算法

Lee*_*e B 6 python scheduling permutation timeslots timetable

希望你能帮我解决这个问题.这对工作没有帮助 - 这是一个非常努力工作的志愿者的慈善机构,他们真的可以使用一个比他们现有的更少混乱/烦人的时间表系统.

如果有人知道一个好的第三方应用程序(当然)自动化这个,那几乎同样好.只是...请不要建议随机的时间表,例如预订教室的东西,因为我认为他们不能这样做.

提前感谢阅读; 我知道这是一个很重要的帖子.我尽我所能尽力记录这一点,并表明我已经自己努力了.

问题

我需要一个工人/时间段调度算法,它为工人生成轮班,符合以下标准:

输入数据

import datetime.datetime as dt

class DateRange:
    def __init__(self, start, end):
        self.start = start
        self.end   = end

class Shift:
    def __init__(self, range, min, max):
        self.range = range
        self.min_workers = min
        self.max_workers = max

tue_9th_10pm = dt(2009, 1, 9,   22, 0)
wed_10th_4am = dt(2009, 1, 10,   4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)

shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)

shift_1 = Shift(shift_1_times, 2,3)  # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2)  # allows 2
shift_3 = Shift(shift_3_times, 2,3)  # allows 3, requires 2, 3 available

shifts = ( shift_1, shift_2, shift_3 )

joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]

joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)

workers = ( joe, bob, sam, ned, max, amy, jim )
Run Code Online (Sandbox Code Playgroud)

处理

从上面来看,轮班工人是要处理的两个主要输入变量

每个班次都需要最少和最大数量的工人.填补班次的最低要求对于成功至关重要,但如果所有其他方法都失败了,手动填充空白的旋转优于"错误":)主要的算法问题是,当有足够的空间时,应该没有不必要的空白工人可以.

理想情况下,班次的最大工人数量将被填补,但这是相对于其他约束的最低优先级,因此如果必须给出,则应该是这样.

灵活的约束

这些有点灵活,如果找不到"完美"的解决方案,它们的界限可以推动一点.这种灵活性应该是最后的手段,而不是随机利用.理想情况下,灵活性可配置为"fudge_factor"变量或类似变量.

  • 两班之间有一个最短的时间段.因此,例如,工人不应该在同一天安排两班倒.
  • 在给定的时间段内(例如,一个月),工人可以做的最大轮班次数
  • 一个月内可以完成一定数量的特定班次(比如隔夜轮班)

很高兴,但没有必要

如果你能想出一个能够实现上述功能并包含其中任何/所有这些算法的算法,我会非常感激和感激.即使是单独执行这些位的附加脚本也会很棒.

  • 重叠的转变.例如,能够指定两个同时发生的"前台"班次和"后台"班次是很好的.这可以通过用不同的移位数据单独调用程序来完成,除了在给定时间段内关于为多个班次安排人员的约束将被遗漏.

  • 每个工人(而非全球)可指定的工人的最短重新安排时间.例如,如果Joe感到工作过度或处理个人问题,或者是初学者学习绳索,我们可能希望比其他工作人员更少地安排他.

  • 当没有可用的工人适合时,选择员工以填补最小班次数字的一些自动/随机/公平方式.

  • 处理突然取消的一些方法,只是填补空白而不重新安排其他班次.

输出测试

可能该算法应该生成尽可能多的匹配解决方案,其中每个解决方案如下所示:

class Solution:
    def __init__(self, shifts_workers):
        """shifts_workers -- a dictionary of shift objects as keys, and a
        a lists of workers filling the shift as values."""

        assert isinstance(dict, shifts_workers)
        self.shifts_workers = shifts_workers
Run Code Online (Sandbox Code Playgroud)

鉴于上述数据,这是针对单个解决方案的测试功能.我认为这是对的,但我也很感激对它的一些同行评审.

def check_solution(solution):
  assert isinstance(Solution, solution)

  def shift_check(shift, workers, workers_allowed):
      assert isinstance(Shift, shift):
      assert isinstance(list, workers):
      assert isinstance(list, workers_allowed)

      num_workers = len(workers)
      assert num_workers >= shift.min_workers
      assert num_workers <= shift.max_workers

      for w in workers_allowed:
          assert w in workers

  shifts_workers = solution.shifts_workers

  # all shifts should be covered
  assert len(shifts_workers.keys()) == 3
  assert shift1 in shifts_workers.keys()
  assert shift2 in shifts_workers.keys()
  assert shift3 in shifts_workers.keys()

  # shift_1 should be covered by 2 people - joe, and bob
  shift_check(shift_1, shifts_workers[shift_1], (joe, bob))

  # shift_2 should be covered by 2 people - sam and amy
  shift_check(shift_2, shifts_workers[shift_2], (sam, amy))

  # shift_3 should be covered by 3 people - ned, max, and jim
  shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
Run Code Online (Sandbox Code Playgroud)

尝试

我已经尝试用遗传算法实现这一点,但似乎无法将其调整得恰到好处,所以尽管基本原理似乎只适用于单班制,但它甚至无法解决几个班次和一些班次的简单案例工作人员.

我最近的尝试是生成每个可能的排列作为解决方案,然后减少不符合约束的排列.这似乎工作得更快,并且让我更进一步,但我使用python 2.6的itertools.product()来帮助生成排列,我无法完全正确.如果有很多错误,我不会感到惊讶,老实说,这个问题不适合我的脑袋:)

目前我的代码是两个文件:models.py和rota.py. models.py看起来像:

# -*- coding: utf-8 -*-
class Shift:
    def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
        self.start = start_datetime
        self.end = end_datetime
        self.duration = self.end - self.start
        self.min_coverage = min_coverage
        self.max_coverage = max_coverage

    def __repr__(self):
        return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)

class Duty:
    def __init__(self, worker, shift, slot):
        self.worker = worker
        self.shift = shift
        self.slot = slot

    def __repr__(self):
        return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
        self.worker.dump(indent=indent, depth=depth+1)
        print ind + ">"

class Avail:
    def __init__(self, start_time, end_time):
        self.start = start_time
        self.end = end_time

    def __repr__(self):
        return "<%s to %s>" % (self.start, self.end)

class Worker:
    def __init__(self, name, availabilities):
        self.name = name
        self.availabilities = availabilities

    def __repr__(self):
        return "<Worker %s Avail=%r>" % (self.name, self.availabilities)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Worker %s" % self.name
        for avail in self.availabilities:
            print ind + " " * indent + repr(avail)
        print ind + ">"

    def available_for_shift(self,  shift):
        for a in self.availabilities:
            if shift.start >= a.start and shift.end <= a.end:
                return True
        print "Worker %s not available for %r (Availability: %r)" % (self.name,  shift, self.availabilities)
        return False

class Solution:
    def __init__(self, shifts):
        self._shifts = list(shifts)

    def __repr__(self):
        return "<Solution: shifts=%r>" % self._shifts

    def duties(self):
        d = []
        for s in self._shifts:
            for x in s:
                yield x

    def shifts(self):
        return list(set([ d.shift for d in self.duties() ]))

    def dump_shift(self, s, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<ShiftList"
        for duty in s:
            duty.dump(indent=indent, depth=depth+1)
        print ind + ">"

    def dump(self, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<Solution"
        for s in self._shifts:
            self.dump_shift(s, indent=indent, depth=depth+1)
        print ind + ">"

class Env:
    def __init__(self, shifts, workers):
        self.shifts = shifts
        self.workers = workers
        self.fittest = None
        self.generation = 0

class DisplayContext:
    def __init__(self,  env):
        self.env = env

    def status(self, msg, *args):
        raise NotImplementedError()

    def cleanup(self):
        pass

    def update(self):
        pass
Run Code Online (Sandbox Code Playgroud)

和rota.py看起来像:

#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-

from datetime import datetime as dt
am2 = dt(2009,  10,  1,  2, 0)
am8 = dt(2009,  10,  1,  8, 0)
pm12 = dt(2009,  10,  1,  12, 0)

def duties_for_all_workers(shifts, workers):
    from models import Duty

    duties = []

    # for all shifts
    for shift in shifts:
        # for all slots
        for cov in range(shift.min_coverage, shift.max_coverage):
            for slot in range(cov):
                # for all workers
                for worker in workers:
                    # generate a duty
                    duty = Duty(worker, shift, slot+1)
                    duties.append(duty)

    return duties

def filter_duties_for_shift(duties,  shift):
    matching_duties = [ d for d in duties if d.shift == shift ]
    for m in matching_duties:
        yield m

def duty_permutations(shifts,  duties):
    from itertools import product

    # build a list of shifts
    shift_perms = []
    for shift in shifts:
        shift_duty_perms = []
        for slot in range(shift.max_coverage):
            slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
            shift_duty_perms.append(slot_duties)
        shift_perms.append(shift_duty_perms)

    all_perms = ( shift_perms,  shift_duty_perms )

    # generate all possible duties for all shifts
    perms = list(product(*shift_perms))
    return perms

def solutions_for_duty_permutations(permutations):
    from models import Solution
    res = []
    for duties in permutations:
        sol = Solution(duties)
        res.append(sol)
    return res

def find_clashing_duties(duty, duties):
    """Find duties for the same worker that are too close together"""
    from datetime import timedelta
    one_day = timedelta(days=1)
    one_day_before = duty.shift.start - one_day
    one_day_after = duty.shift.end + one_day
    for d in [ ds for ds in duties if ds.worker == duty.worker ]:
        # skip the duty we're considering, as it can't clash with itself
        if duty == d:
            continue

        clashes = False

        # check if dates are too close to another shift
        if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
            clashes = True

        # check if slots collide with another shift
        if d.slot == duty.slot:
            clashes = True

        if clashes:
            yield d

def filter_unwanted_shifts(solutions):
    from models import Solution

    print "possibly unwanted:",  solutions
    new_solutions = []
    new_duties = []

    for sol in solutions:
        for duty in sol.duties():
            duty_ok = True

            if not duty.worker.available_for_shift(duty.shift):
                duty_ok = False

            if duty_ok:
                print "duty OK:"
                duty.dump(depth=1)
                new_duties.append(duty)
            else:
                print "duty **NOT** OK:"
                duty.dump(depth=1)

        shifts = set([ d.shift for d in new_duties ])
        shift_lists = []
        for s in shifts:
            shift_duties = [ d for d in new_duties if d.shift == s ]
            shift_lists.append(shift_duties)

        new_solutions.append(Solution(shift_lists))

    return new_solutions

def filter_clashing_duties(solutions):
    new_solutions = []

    for sol in solutions:
        solution_ok = True

        for duty in sol.duties():
            num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))

            # check if many duties collide with this one (and thus we should delete this one
            if num_clashing_duties > 0:
                solution_ok = False
                break

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_incomplete_shifts(solutions):
    new_solutions = []

    shift_duty_count = {}

    for sol in solutions:
        solution_ok = True

        for shift in set([ duty.shift for duty in sol.duties() ]):
            shift_duties = [ d for d in sol.duties() if d.shift == shift ]
            num_workers = len(set([ d.worker for d in shift_duties ]))

            if num_workers < shift.min_coverage:
                solution_ok = False

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_solutions(solutions,  workers):
    # filter permutations ############################
    # for each solution
    solutions = filter_unwanted_shifts(solutions)
    solutions = filter_clashing_duties(solutions)
    solutions = filter_incomplete_shifts(solutions)
    return solutions

def prioritise_solutions(solutions):
    # TODO: not implemented!
    return solutions

    # prioritise solutions ############################
    # for all solutions
        # score according to number of staff on a duty
        # score according to male/female staff
        # score according to skill/background diversity
        # score according to when staff last on shift

    # sort all solutions by score

def solve_duties(shifts, duties,  workers):
    # ramify all possible duties #########################
    perms = duty_permutations(shifts,  duties)
    solutions = solutions_for_duty_permutations(perms)
    solutions = filter_solutions(solutions,  workers)
    solutions = prioritise_solutions(solutions)
    return solutions

def load_shifts():
    from models import Shift
    shifts = [
        Shift(am2, am8, 2, 3),
        Shift(am8, pm12, 2, 3),
    ]
    return shifts

def load_workers():
    from models import Avail, Worker
    joe_avail = ( Avail(am2,  am8), )
    sam_avail = ( Avail(am2,  am8), )
    ned_avail = ( Avail(am2,  am8), )
    bob_avail = ( Avail(am8,  pm12), )
    max_avail = ( Avail(am8,  pm12), )
    joe = Worker("joe", joe_avail)
    sam = Worker("sam", sam_avail)
    ned = Worker("ned", sam_avail)
    bob = Worker("bob", bob_avail)
    max = Worker("max", max_avail)
    return (joe, sam, ned, bob, max)

def main():
    import sys

    shifts = load_shifts()
    workers = load_workers()
    duties = duties_for_all_workers(shifts, workers)
    solutions = solve_duties(shifts, duties, workers)

    if len(solutions) == 0:
        print "Sorry, can't solve this.  Perhaps you need more staff available, or"
        print "simpler duty constraints?"
        sys.exit(20)
    else:
        print "Solved.  Solutions found:"
        for sol in solutions:
            sol.dump()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

在结果之前剪切调试输出,这当前给出:

Solved.  Solutions found:
    <Solution
        <ShiftList
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker joe
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker sam
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker ned
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
        >
        <ShiftList
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker bob
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker max
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
        >
    >
Run Code Online (Sandbox Code Playgroud)

Mat*_* M. 1

好吧,我不知道特定的算法,但这是我会考虑的。

评估

无论采用哪种方法,您都需要一个函数来评估您的解决方案满足约束的程度。您可以采用“比较”方法(没有全局分数,而是比较两个解决方案的方法),但我建议进行评估。

真正好的是,如果您可以获得较短时间跨度的分数,例如每天,如果您可以从部分解决方案“预测”最终分数的范围(例如,仅前 3 个分数),这对算法确实很有帮助7 天)。这样,如果它已经太低而无法满足您的期望,您可以中断基于此部分解决方案的计算。

对称

很可能在这 200 个人中,您有相似的个人资料:即人们具有相同的特征(可用性、经验、意愿……)。如果你有两个具有相同个人资料的人,他们将可以互换:

  • 解决方案 1:(班次 1:Joe)(班次 2:Jim)...仅限其他工人...
  • 解决方案 2:(班次 1:Jim)(班次 2:Joe)...仅限其他工人...

从您的角度来看,实际上是相同的解决方案。

好处是,通常情况下,您的配置文件比人少,这对节省计算时间有很大帮助!

例如,假设您根据解决方案 1 生成了所有解决方案,那么无需根据解决方案 2 计算任何内容。

迭代

您可以考虑增量生成(例如一次 1 周),而不是立即生成整个计划。净收益是一周的复杂性降低了(可能性更少)。

然后,一旦你有了这周的时间,你就计算第二个,当然要小心地将第一个考虑到你的限制。

优点是您明确设计算法以考虑已使用的解决方案,这样对于下一个时间表生成,它将确保不会让一个人连续 24 小时工作!

序列化

您应该考虑解决方案对象的序列化(选择您的选择,pickle 对于 Python 来说非常好)。在生成新计划时,您将需要之前的计划,而且我敢打赌您不会为这 200 人手动输入它。

详尽无遗

现在,毕竟,我实际上赞成进行详尽的搜索,因为使用对称性和评估的可能性可能不会那么多(尽管问题仍然是 NP 完全的,但没有灵丹妙药)。

您可能愿意尝试回溯算法

另外,您应该查看以下处理类似问题的链接:

两者都讨论了实施过程中遇到的问题,因此检查它们应该会对您有所帮助。