使用内部同步实现作业列表

Ant*_*eru 7 parallel-processing performance multithreading scalability

我正在研究一个简单的工作线程框架,它与id Tech 5 Challenges中描述的框架非常相似.在最基本的层面上,我有一组作业列表,我想在一堆CPU线程中安排这些列表(使用标准线程池进行实际调度.)但是,我想知道这个信号/等待的东西是怎么回事在等待列表中可以有效地实现.据我了解,如果信号令牌尚未执行,则等待令牌会阻止列表执行.这隐含地意味着信号之前的所有内容必须在信号可以被提升之前完成.所以我们假设我们有一个这样的列表:

J1, J2, S, J3, W, J4
Run Code Online (Sandbox Code Playgroud)

然后调度可以像这样:

#1: J1, J2, J3
<wait for J1, J2, run other lists if possible>
#2: J4
Run Code Online (Sandbox Code Playgroud)

然而,这并不像它看起来那么容易,因为给定一组名单,我将不得不将部分之间readywaiting,也有特殊代码的信号和标签的东西在他们之前收集的所有作业,让他们可以触发信号,当且仅当他们都完成的(意为实例,它是不再可能的工作添加到列表中,同时执行它,如下面的信号访问先前插入作业).

是否有任何"标准"的方式有效地实现这一点?我也想知道如何最好地安排作业列表执行,现在,每个核心抓取一个作业列表,并安排其中的所有作业,这提供了非常好的扩展(对于32k作业,0.7毫秒,我得到101%,我猜部分原因是单线程版本有时被安排到不同的核心上.)

Ray*_*rns 4

这是一种比较简单的调度算法。有几个问题乍一看似乎很棘手,但实际上并非如此(信号/等待和缓存位置)。我将解释这些技术,然后给出一些我编写的代码来说明这些概念,然后给出一些有关调优的最后注释。

使用的算法

有效地处理信号/等待一开始似乎很棘手,但实际上却非常容易。由于信号/等待对不能嵌套或重叠,因此在任何给定时间实际上只能有两个被满足,一个被等待。只需保留一个指向最近未满足的信号的“CurrentSignal”指针就可以完成簿记。

确保内核不会在列表之间跳转太多,并且给定列表不会在太多内核之间共享也相对容易:每个内核不断从同一列表中获取作业,直到阻塞,然后切换到另一个列表。为了防止所有核心聚集在一个列表上,为每个列表保留一个 WorkerCount 来告诉有多少个核心正在使用它,并且对列表进行组织,以便核心首先选择具有较少工作线程的列表。

锁定可以保持简单,只需锁定调度程序或您随时正在处理的列表,而不是同时锁定两者。

您对在列表已经开始执行后将作业添加到列表中表示了一些担忧。事实证明,支持这一点几乎是微不足道的:当将作业添加到当前已完成的列表时,它所需要的只是从列表到调度程序的调用,以便调度程序可以调度新作业。

数据结构

以下是您需要的基本数据结构:

class Scheduler
{
  LinkedList<JobList>[] Ready; // Indexed by number of cores working on list
  LinkedList<JobList> Blocked;
  int ReadyCount;
  bool Exit;

  public:
    void AddList(JobList* joblist);
    void DoWork();

  internal:
    void UpdateQueues(JobList* joblist);

    void NotifyBlockedCores();
    void WaitForNotifyBlockedCores();
}

class JobList
{
  Scheduler Scheduler;
  LinkedList<JobList> CurrentQueue;

  LinkedList<Job> Jobs;            // All jobs in the job list
  LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list,
                                      plus a dummy

  Job* NextJob;                    // The next job to schedule, if any
  int NextJobIndex;                // The index of NextJob

  SignalPoint* CurrentSignal;      // First signal not fully satisfied

  int WorkerCount;                 // # of cores executing in this list

  public:
    void AddJob(Job* job);
    void AddSignal();
    void AddWait();

  internal:
    void Ready { get; }
    void GetNextReadyJob(Job& job, int& jobIndex);
    void MarkJobCompleted(Job job, int jobIndex);
}
class SignalPoint
{
  int SignalJobIndex = int.MaxValue;
  int WaitJobIndex = int.MaxValue;
  int IncompleteCount = 0;
}
Run Code Online (Sandbox Code Playgroud)

请注意,给定作业列表的信号点最方便地与实际作业列表分开存储。

调度器实现

调度程序跟踪作业列表,将它们分配给核心,并执行作业列表中的作业。

AddList 将作业添加到调度程序。它必须被放置在“就绪”或“阻塞”队列中,具体取决于它是否有任何工作要做(即是否已添加任何作业),因此只需调用 UpdateQueues 即可。

void Scheduler.AddList(JobList* joblist)
{
  joblist.Scheduler = this;
  UpdateQueues(joblist);
}
Run Code Online (Sandbox Code Playgroud)

UpdateQueues 集中了队列更新逻辑。请注意选择新队列的算法,以及当工作可用时向空闲核心发出的通知:

void Scheduler.UpdateQueues(JobList* joblist)
{
  lock(this)
  {
    // Remove from prior queue, if any
    if(joblist.CurrentQueue!=null)
    {
      if(joblist.CurrentQueue!=Blocked) ReadyCount--;
      joblist.CurrentQueue.Remove(joblist);
    }

    // Select new queue
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked;

    // Add to new queue
    joblist.CurrentQueue.Add(joblist);
    if(joblist.CurrentQueue!=Blocked)
      if(++ReadyCount==1) NotifyBlockedCores();
  }
}
Run Code Online (Sandbox Code Playgroud)

DoWork 是一个正常的调度程序工作,除了: 1. 它选择具有最少工作人员的 JobList, 2. 它处理给定作业列表中的作业,直到无法再处理为止,以及 3. 它存储 jobIndex 以及作业,以便joblist可以轻松更新完成状态(实施细节)。

void Scheduler.DoWork()
{
  while(!Exit)
  {
    // Get a job list to work on
    JobList *list = null;
    lock(this)
    {
      for(int i=0; i<Ready.Length; i++)
        if(!Ready[i].Empty)
        {
          list = Ready[i].First;
          break;
        }
      if(list==null)  // No work to do
      {
        WaitForNotifyBlockedCores();
        continue;
      }
      list.WorkerCount++;
      UpdateQueues(list);
    }

    // Execute jobs in the list as long as possible
    while(true)
    {
      int jobIndex;
      Job job;
      if(!GetNextReadyJob(&job, &jobIndex)) break;

      job.Execute();

      list.MarkJobCompleted(job, jobIndex);
    }

    // Release the job list
    lock(this)
    {
      list.WorkerCount--;
      UpdateQueues(list);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

工作清单实施

JobList 跟踪信号/等待如何散布在作业中,并跟踪哪些信号/等待对在其信号点之前已经完成了所有操作。

构造函数创建一个虚拟信号点来添加作业。每当添加新的“信号”时,该信号点就变成真实信号点(并且添加新的虚拟信号点)。

JobList.JobList()
{
  // Always have a dummy signal point at the end
  Signals.Add(CurrentSignal = new SignalPoint());
}
Run Code Online (Sandbox Code Playgroud)

AddJob 将作业添加到列表中。它在 SignalPoint 中被标记为不完整。当作业实际执行时,同一SignalPoint的IncompleteCount会递减。还需要告诉调度程序事情可能已经改变,因为新作业可以立即执行。请注意,调度程序在“this”上的锁释放后调用,以避免死锁。

void JobList.AddJob(Job job)
{
  lock(this)
  {
    Jobs.Add(job);
    Signals.Last.IncompleteCount++;
    if(NextJob == null)
      NextJob = job;
  }
  if(Scheduler!=null)
    Scheduler.UpdateQueues(this);
}
Run Code Online (Sandbox Code Playgroud)

AddSignal 和 AddWait 将信号和等待添加到作业列表中。注意,AddSignal实际上创建了一个新的SignalPoint,而AddWait只是在之前创建的SignalPoint中填充了等待点信息。

void JobList.AddSignal()
{
  lock(this)
  {
    Signals.Last.SignalJobIndex = Jobs.Count;  // Reify dummy signal point
    Signals.Add(new SignalPoint());            // Create new dummy signal point
  }
}


void JobList.AddWait()
{
  lock(this)
  {
    Signals.Last.Previous.WaitJobIndex = Jobs.Count;
  }
}
Run Code Online (Sandbox Code Playgroud)

Ready 属性确定列表是否已准备好分配给它的其他核心。如果下一个作业在开始之前等待信号,则可能有两个或三个核心在列表上工作,而列表尚未“准备好”。

bool JobList.Ready
{
  get
  {
    lock(this)
    {
      return NextJob!=null &&
        (CurrentSignal==Signals.Last ||
         NextJobIndex < CurrentSignal.WaitJobIndex);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

GetNextReadyJob 非常简单:如果我们准备好了,只需返回列表中的下一个作业即可。

void JobList.GetNextReadyJob(Job& job, int& jobIndex)
{
  lock(this)
  {
    if(!Ready) return false;
    jobIndex = list.NextJobIndex++;
    job = list.NextJob; list.NextJob = job.Next;
    return true;

  }
}
Run Code Online (Sandbox Code Playgroud)

MarkJobCompleted 可能是其中最有趣的。由于信号和等待的结构,当前作业要么在 CurrentSignal 之前,要么在 CurrentSignal 和 CurrentSignal.Next 之间(如果它在最后一个实际信号之后,则将被计为在 CurrentSignal 和末尾的虚拟 SignalPoint 之间) )。我们需要减少未完成工作的数量。如果该计数变为零,我们可能还需要继续处理下一个信号。当然,最后我们永远不会通过虚拟 SignalPoint。

请注意,此代码没有调用 Scheduler.UpdateQueue,因为我们知道调度程序将在一秒钟内调用 GetNextReadyJob,如果它返回 false,无论如何它都会调用 UpdateQueue。

void JobList.MarkJobCompleted(Job job, int jobIndex)
{
  lock(this)
  {
    if(jobIndex >= CurrentSignal.SignalJobIndex)
      CurrentSignal.Next.IncompleteCount--;
    else
    {
      CurrentSignal.IncompleteCount--;
      if(CurrentSignal.IncompleteCount==0)
        if(CurrentSignal.WaitJobIndex < int.MaxValue)
          CurrentSignal = CurrentSignal.Next;
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

根据列表长度、工作长度估计等进行调整

上面的代码没有关注作业列表的长度,因此,如果有一百个小型作业列表和一个巨大的作业列表,则每个核心可能会采用一个单独的小型作业列表,然后全部聚集在巨大的作业列表上一、导致效率低下。这可以通过使 Ready[] 成为优先级队列数组来解决(joblist.Jobs.Count - joblist.NextJobIndex),但优先级仅在正常 UpdateQueue 情况下实际更新以提高效率。

通过创建考虑信号/等待组合的数量和间隔来确定优先级的启发式,这可以变得更加复杂。这种启发式最好通过使用作业持续时间和资源使用的分布来调整。

如果单个作业的持续时间已知,或者如果可以对其进行良好的估计,则启发式可以使用估计的剩余持续时间而不仅仅是列表长度。

最后的笔记

这是针对您提出的问题的相当标准的解决方案。您可以使用我给出的算法,它们会起作用,包括锁定,但由于以下几个原因,您将无法编译我上面编写的代码:

  1. 它是 C++ 和 C# 语法的疯狂组合。我最初开始用 C# 编写,然后将一些语法更改为 C++ 风格,因为我认为这更可能是您在此类项目中使用的语法。但我留下了相当多的 C# 主义。幸运的是没有 LINQ;-)。

  2. LinkedList 的细节有些令人困惑。我假设列表可以执行“第一个”、“最后一个”、“添加”和“删除”,并且列表中的项目可以执行“上一个”和“下一个”。但我没有对我所知道的任何真正的链表类使用实际的 API。

  3. 我没有编译或测试它。我保证某处有一两个错误。

底线:您应该将上面的代码视为伪代码,即使它看起来像真正的 McCoy。

享受!