nc3*_*c3b 48 concurrency multithreading design-patterns threadpool
我一直在阅读线程池模式,我似乎无法找到以下问题的通常解决方案.
我有时希望连续执行任务.例如,我从文件中读取文本块,由于某种原因,我需要按顺序处理块.所以基本上我想消除一些任务的并发性.
考虑这种情况,其中*需要按照推入的顺序处理任务.其他任务可以按任何顺序处理.
push task1
push task2
push task3 *
push task4 *
push task5
push task6 *
....
and so on
Run Code Online (Sandbox Code Playgroud)
在线程池的上下文中,没有这个约束,一个待处理任务的单个队列工作正常,但在这里它没有.
我想过让一些线程在特定于线程的队列上运行,而其他线程在"全局"队列上运行.然后,为了以串行方式执行某些任务,我只需将它们推送到单个线程所在的队列中.这确实听起来有点笨拙.
所以,这个长篇故事中的真正问题是:你将如何解决这个问题?您如何确保订购这些任务?
作为一个更普遍的问题,假设上面的场景变为
push task1
push task2 **
push task3 *
push task4 *
push task5
push task6 *
push task7 **
push task8 *
push task9
....
and so on
Run Code Online (Sandbox Code Playgroud)
我的意思是组内的任务应该按顺序执行,但组本身可以混合使用.所以你可以3-2-5-4-7举个例子.
另外需要注意的是,我无法预先访问组中的所有任务(我不能等到所有任务在启动组之前到达).
感谢您的时间.
Tim*_*oyd 17
类似下面的内容将允许串行和并行任务排队,其中串行任务将一个接一个地执行,并行任务将以任何顺序执行,但并行执行.这使您能够在必要时序列化任务,也可以执行并行任务,但是在接收任务时执行此操作,即您不需要预先了解整个序列,动态维护执行顺序.
internal class TaskQueue
{
private readonly object _syncObj = new object();
private readonly Queue<QTask> _tasks = new Queue<QTask>();
private int _runningTaskCount;
public void Queue(bool isParallel, Action task)
{
lock (_syncObj)
{
_tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
}
ProcessTaskQueue();
}
public int Count
{
get{lock (_syncObj){return _tasks.Count;}}
}
private void ProcessTaskQueue()
{
lock (_syncObj)
{
if (_runningTaskCount != 0) return;
while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
{
QTask parallelTask = _tasks.Dequeue();
QueueUserWorkItem(parallelTask);
}
if (_tasks.Count > 0 && _runningTaskCount == 0)
{
QTask serialTask = _tasks.Dequeue();
QueueUserWorkItem(serialTask);
}
}
}
private void QueueUserWorkItem(QTask qTask)
{
Action completionTask = () =>
{
qTask.Task();
OnTaskCompleted();
};
_runningTaskCount++;
ThreadPool.QueueUserWorkItem(_ => completionTask());
}
private void OnTaskCompleted()
{
lock (_syncObj)
{
if (--_runningTaskCount == 0)
{
ProcessTaskQueue();
}
}
}
private class QTask
{
public Action Task { get; set; }
public bool IsParallel { get; set; }
}
}
Run Code Online (Sandbox Code Playgroud)
更新
要使用串行和并行任务组合处理任务组,GroupedTaskQueue可以TaskQueue为每个组管理一个.同样,您不需要预先知道组,它们都是在接收任务时动态管理的.
internal class GroupedTaskQueue
{
private readonly object _syncObj = new object();
private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
private readonly string _defaultGroup = Guid.NewGuid().ToString();
public void Queue(bool isParallel, Action task)
{
Queue(_defaultGroup, isParallel, task);
}
public void Queue(string group, bool isParallel, Action task)
{
TaskQueue queue;
lock (_syncObj)
{
if (!_queues.TryGetValue(group, out queue))
{
queue = new TaskQueue();
_queues.Add(group, queue);
}
}
Action completionTask = () =>
{
task();
OnTaskCompleted(group, queue);
};
queue.Queue(isParallel, completionTask);
}
private void OnTaskCompleted(string group, TaskQueue queue)
{
lock (_syncObj)
{
if (queue.Count == 0)
{
_queues.Remove(group);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
Ant*_*ams 14
线程池适用于任务的相对顺序无关紧要的情况,只要它们全部完成即可.特别是,它们必须并行完成.
如果您的任务必须按特定顺序完成,那么它们不适合并行性,因此线程池不合适.
如果要将这些串行任务移出主线程,则具有任务队列的单个后台线程将适合这些任务.您可以继续将线程池用于适用于并行性的其余任务.
是的,这意味着你必须决定在哪里提交任务,具体取决于它是有序任务还是"可以并行化"任务,但这不是什么大问题.
如果您的组必须序列化,但可以与其他任务并行运行,那么您有多种选择:
基本上,有许多待定任务.某些任务只能在一个或多个其他待处理任务完成执行时执行.
待处理任务可以在依赖关系图中建模:
所以(至少)有一个线程用于添加/删除挂起的任务,并且有一个工作线程的线程池.
将任务添加到依赖关系图时,您必须检查:
表现:
假设:
您可能已经阅读过这些行,您必须设计任务,以便它们不会干扰其他任务.此外,必须有一种方法来确定任务的优先级.任务优先级应包括每个任务处理的数据.两个任务可能不会同时改变同一个对象; 其中一个任务应该优先于另一个任务,或者对象上执行的操作必须是线程安全的.
要执行您想要对线程池执行的操作,您可能必须创建某种调度程序.
像这样的东西:
TaskQueue - > Scheduler - > Queue - > ThreadPool
调度程序在其自己的线程中运行,保留作业之间的依赖关系.当一个作业准备好完成时,调度程序只是将它推送到线程池的队列中.
ThreadPool可能必须向调度程序发送信号以指示作业何时完成,以便调度程序可以根据该作业将作业放入队列中.
在您的情况下,依赖项可能存储在链接列表中.
假设您有以下依赖项:3 - > 4 - > 6 - > 8
作业3正在线程池上运行,您仍然不知道作业8存在.
工作3结束.从链表中删除3,将作业4放在队列中的线程池中.
工作8到了.你把它放在链表的末尾.
唯一必须完全同步的构造是调度程序之前和之后的队列.
如果我正确理解了这个问题,jdk executors 没有这个能力,但是很容易推出你自己的。你基本上需要
ExecutorService)与 jdk 执行器的不同之处在于它们有 1 个包含 n 个线程的队列,但您需要 n 个队列和 m 个线程(其中 n 可能等于也可能不等于 m)
* 阅读后编辑每个任务都有一个键 *
详细一点
key.hashCode() % n也可能是已知键值的一些静态映射线程或任何你想要的将自动重新启动的工作线程添加到这个方案就足够容易了,然后你需要工作线程向某个管理器注册以声明“我拥有这个队列”,然后围绕该队列进行一些内务处理 + 检测线程中的错误(这意味着它取消注册该队列的所有权,将队列返回到空闲队列池,这是启动新线程的触发器)
| 归档时间: |
|
| 查看次数: |
25661 次 |
| 最近记录: |