确保线程池中的任务执行顺序

nc3*_*c3b 48 concurrency multithreading design-patterns threadpool

我一直在阅读线程池模式,我似乎无法找到以下问题的通常解决方案.

我有时希望连续执行任务.例如,我从文件中读取文本块,由于某种原因,我需要按顺序处理块.所以基本上我想消除一些任务的并发性.

考虑这种情况,其中*需要按照推入的顺序处理任务.其他任务可以按任何顺序处理.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on
Run Code Online (Sandbox Code Playgroud)

在线程池的上下文中,没有这个约束,一个待处理任务的单个队列工作正常,但在这里它没有.

我想过让一些线程在特定于线程的队列上运行,而其他线程在"全局"队列上运行.然后,为了以串行方式执行某些任务,我只需将它们推送到单个线程所在的队列中.这确实听起来有点笨拙.

所以,这个长篇故事中的真正问题是:你将如何解决这个问题?您如何确保订购这些任务

编辑

作为一个更普遍的问题,假设上面的场景变为

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on
Run Code Online (Sandbox Code Playgroud)

我的意思是组内的任务应该按顺序执行,但组本身可以混合使用.所以你可以3-2-5-4-7举个例子.

另外需要注意的是,我无法预先访问组中的所有任务(我不能等到所有任务在启动组之前到达).

感谢您的时间.

Tim*_*oyd 17

类似下面的内容将允许串行和并行任务排队,其中串行任务将一个接一个地执行,并行任务将以任何顺序执行,但并行执行.这使您能够在必要时序列化任务,也可以执行并行任务,但是在接收任务时执行此操作,即您不需要预先了解整个序列,动态维护执行顺序.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}
Run Code Online (Sandbox Code Playgroud)

更新

要使用串行和并行任务组合处理任务组,GroupedTaskQueue可以TaskQueue为每个组管理一个.同样,您不需要预先知道组,它们都是在接收任务时动态管理的.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


Ant*_*ams 14

线程池适用于任务的相对顺序无关紧要的情况,只要它们全部完成即可.特别是,它们必须并行完成.

如果您的任务必须按特定顺序完成,那么它们不适合并行性,因此线程池不合适.

如果要将这些串行任务移出主线程,则具有任务队列的单个后台线程将适合这些任务.您可以继续将线程池用于适用于并行性的其余任务.

是的,这意味着你必须决定在哪里提交任务,具体取决于它是有序任务还是"可以并行化"任务,但这不是什么大问题.

如果您的组必须序列化,但可以与其他任务并行运行,那么您有多种选择:

  1. 为每个组创建一个任务,按顺序执行相关的组任务,并将此任务发布到线程池.
  2. 让组中的每个任务显式等待组中的上一个任务,并将它们发布到线程池中.这要求您的线程池可以处理线程正在等待尚未安排的任务而没有死锁的情况.
  3. 为每个组都有一个专用线程,并在适当的消息队列上发布组任务.


pvo*_*ten 8

基本上,有许多待定任务.某些任务只能在一个或多个其他待处理任务完成执行时执行.

待处理任务可以在依赖关系图中建模:

  • "任务1 - >任务2"表示"任务2只能在任务1完成后才能执行".箭头指向执行顺序的方向.
  • 任务的indegree(指向它的任务数)确定任务是否准备好执行.如果indegree为0,则可以执行.
  • 有时任务必须等待多个任务完成,然后indegree> 1.
  • 如果任务不再需要等待其他任务完成(其indegree为零),则可以将其提交给具有工作线程的线程池,或者具有等待工作线程拾取的任务的队列.您知道提交的任务不会导致死锁,因为任务不会等待任何事情.作为优化,您可以使用优先级队列,例如,首先执行依赖关系图中的更多任务所依赖的任务.这也不会引发死锁,因为线程池中的所有任务都可以执行.然而,它可能引发饥饿.
  • 如果任务完成执行,则可以从依赖关系图中删除它,可能会减少其他任务的不确定性,而这些任务又可以提交到工作线程池中.

所以(至少)有一个线程用于添加/删除挂起的任务,并且有一个工作线程的线程池.

将任务添加到依赖关系图时,您必须检查:

  • 如何在依赖关系图中连接任务:它必须等待完成哪些任务以及哪些任务必须等待它完成?相应地从新任务中绘制连接.
  • 绘制连接后:新连接是否导致依赖关系图中的任何循环?如果是这样,就会出现僵局.

表现:

  • 如果并行执行实际上很少可能,则此模式比顺序执行慢,因为无论如何,您需要额外的管理才能完成所有操作.
  • 如果在实践中可以同时执行许多任务,则此模式很快.

假设:

您可能已经阅读过这些行,您必须设计任务,以便它们不会干扰其他任务.此外,必须有一种方法来确定任务的优先级.任务优先级应包括每个任务处理的数据.两个任务可能不会同时改变同一个对象; 其中一个任务应该优先于另一个任务,或者对象上执行的操作必须是线程安全的.


Mar*_*tin 6

要执行您想要对线程池执行的操作,您可能必须创建某种调度程序.

像这样的东西:

TaskQueue - > Scheduler - > Queue - > ThreadPool

调度程序在其自己的线程中运行,保留作业之间的依赖关系.当一个作业准备好完成时,调度程序只是将它推送到线程池的队列中.

ThreadPool可能必须向调度程序发送信号以指示作业何时完成,以便调度程序可以根据该作业将作业放入队列中.

在您的情况下,依赖项可能存储在链接列表中.

假设您有以下依赖项:3 - > 4 - > 6 - > 8

作业3正在线程池上运行,您仍然不知道作业8存在.

工作3结束.从链表中删除3,将作业4放在队列中的线程池中.

工作8到了.你把它放在链表的末尾.

唯一必须完全同步的构造是调度程序之前和之后的队列.


Mat*_*att 5

如果我正确理解了这个问题,jdk executors 没有这个能力,但是很容易推出你自己的。你基本上需要

  • 一个工作线程池,每个线程都有一个专用队列
  • 对您提供工作的那些队列的一些抽象(参见ExecutorService
  • 某些算法为每个工作确定性地选择特定队列
  • 然后每件工作都会被提供到正确的队列中,从而以正确的顺序进行处理

与 jdk 执行器的不同之处在于它们有 1 个包含 n 个线程的队列,但您需要 n 个队列和 m 个线程(其中 n 可能等于也可能不等于 m)

* 阅读后编辑每个任务都有一个键 *

详细一点

  • 编写一些代码,将键转换为给定范围内的索引(整数)(0-n,其中 n 是您想要的线程数),这可能很简单,key.hashCode() % n也可能是已知键值的一些静态映射线程或任何你想要的
  • 启动时
    • 创建 n 个队列,将它们放入索引结构中(数组,列出任何内容)
    • 启动 n 个线程,每个线程只从队列中执行阻塞操作
    • 当它收到一些工作时,它知道如何执行特定于该任务/事件的工作(如果您有异构事件,您显然可以将任务映射到操作)
  • 将其存储在接受工作项的某个外观后面
  • 当任务到达时,将其交给门面
    • 外观根据键为任务找到正确的队列,并将其提供给该队列

将自动重新启动的工作线程添加到这个方案就足够容易了,然后你需要工作线程向某个管理器注册以声明“我拥有这个队列”,然后围绕该队列进行一些内务处理 + 检测线程中的错误(这意味着它取消注册该队列的所有权,将队列返回到空闲队列池,这是启动新线程的触发器)


归档时间:

查看次数:

25661 次

最近记录:

7 年,5 月 前