管理动态线程数

Jer*_*dge 7 delphi queue multithreading list delphi-xe2

首先,我仍然熟悉多线程,并且不太了解术语.我需要确保我做得对,因为这是一个敏感话题.

产品规格

我正在构建的是一个包含动态线程数的组件.这些线程中的每一个都被重用于执行许多请求.我可以在创建它时以及在执行它之前为线程提供所有必要的细节,以及提供事件处理程序.一旦它被执行,我几乎完成了一个请求,我提供了另一个请求.请求从另一个独立的后台线程被送入这些线程,后台线程不断处理请求队列.所以这个系统有两个列表:1)请求记录列表,2)线程指针列表.

我正在使用TThread该类的后代(至少这是我熟悉的线程方法).我通过同步创建线程时分配的事件触发器从线程获得反馈.线程正在后台加载和保存数据,当它们完成后,它们会自行重置以准备处理下一个请求.

问题

现在的麻烦决定如何(通过组件的属性处理不断变化的允许线程的数量的情况下,当开始ActiveThreads: TActiveThreadRangeTActiveThreadRange= 1..20).因此,一次可以创建1到20个线程.但是,当我们说,使用这个组件的应用程序将此属性从5更改为3.此时,已经创建了5个线程,并且我不想强行释放该线程,如果它正好忙.我需要等到它完成后才能释放它.另一方面,如果属性从3更改为5,那么我需要创建2个新线程.我需要知道在这种情况下"跟踪"这些线程的正确方法.

可能性

以下是我可以想到的"跟踪"这些线程的一些可能方法......

  • 保持TList包含每个创建的线程 - 易于管理
  • 创建一个TList包含每个创建的线程的包装器或后代 - 更易于管理,但需要更多工作
  • 保持array包含每个创建的线程 - 这会比一个更好TList吗?
  • 创建一个包含每个创建的线程的数组包装器

但回到我原来的问题 - 当ActiveThreads属性减少时如何处理现有繁忙的线程?创建它们没有问题,但释放它们变得令人困惑.我通常制作自己解放的线程,但这是我第一次制作一个可以重复使用的线程.我只需要知道在不中断任务的情况下销毁这些线程的正确方法.

更新

根据反馈,我已经获得并开始实现OmniThreadLibrary(以及长期需要的FastMM).我也改变了我的方法 - 我可以创建这些线程进程而无需管理它们,也没有其他线程来处理队列...

  • 1个生成新进程的主方法
    • function NewProcess(const Request: TProcessRequest): TProcessInfo;
    • TProcessRequest 是一个记录,其中包含要执行的操作的规范(文件名,选项等)
    • TProcessInfo 是一个传回一些状态信息的记录.
  • 在创建新流程时,为事件"完成"事件处理事件处理程序.当组件收到此消息时,它将检查队列.
    • 如果命令排队,它会将活动进程限制与当前进程计数进行比较
    • >如果超出限制,只需停止,下一个完成的过程将执行相同的检查
    • >如果在限制范围内,启动另一个新流程(确保之前的流程完成后)
    • 如果没有命令排队,那么就停止
  • 每个进程在完成任务后都可以自行死亡(没有保持活动的线程)
  • 我不必担心另一个计时器或线程不断循环
    • 相反,每个进程都会破坏其自身并在执行此操作之前检查新请求

另一个更新

我实际上已经恢复使用了一个TThread,因为OTL非常不舒服.我喜欢在自己的课堂上把东西包裹起来.

Mar*_*mes 5

正如@NGLN等所解释的那样,您需要汇集一些线程并接受管理线程数的最简单方法是将实际线程数与所需数量分开.将线程添加到池中很容易 - 只需创建更多实例,(将生产者 - 消费者任务输入队列作为参数传递,以便线程知道要等待的内容).如果所需的线程数小于当前存在的线程数,您可以排队足够的'poison-pills'以消除额外的线程.

不要保留任何线程指针列表 - 这是一个微观管理麻烦的负担,这是不必要的,(并且可能会出错).您需要保留的只是池中所需线程数的计数,这样您就可以知道在更改"poolDepth"属性时要采取的操作.

事件触发器最好加载到发布到池中的作业中 - 从一些"TpooledTask"类中将它们全部下载,该类将事件作为构造函数参数并将其存储在某些"FonComplete"TNotifyEvent中.运行任务的线程可以在完成作业时调用FonComplete(使用TpooledTask作为sender参数) - 您不需要知道哪个线程运行了该任务.

例:

    unit ThreadPool;

    interface

    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, contnrs, syncobjs;


    type

    TpooledTask=class(TObject)
    private
      FonComplete:TNotifyEvent;
    protected
      Fparam:TObject;
      procedure execute; virtual; abstract;
    public
      constructor create(onComplete:TNotifyEvent;param:TObject);
    end;

    TThreadPool=class(TObjectQueue)
    private
      access:TcriticalSection;
      taskCounter:THandle;
      threadCount:integer;
    public
      constructor create(initThreads:integer);
      procedure addTask(aTask:TpooledTask);
    end;

    TpoolThread=class(Tthread)
    private
      FmyPool:TThreadPool;
    protected
      procedure Execute; override;
    public
      constructor create(pool:TThreadPool);
    end;

    implementation

    { TpooledTask }

    constructor TpooledTask.create(onComplete: TNotifyEvent; param: TObject);
    begin
      FonComplete:=onComplete;
      Fparam:=param;
    end;

    { TThreadPool }

    procedure TThreadPool.addTask(aTask: TpooledTask);
    begin
      access.acquire;
      try
        push(aTask);
      finally
        access.release;
      end;
      releaseSemaphore(taskCounter,1,nil); // release one unit to semaphore
    end;

    constructor TThreadPool.create(initThreads: integer);
    begin
      inherited create;
      access:=TcriticalSection.create;
      taskCounter:=createSemaphore(nil,0,maxInt,'');
      while(threadCount<initThreads) do
      begin
        TpoolThread.create(self);
        inc(threadCount);
      end;
    end;

    { TpoolThread }

    constructor TpoolThread.create(pool: TThreadPool);
    begin
      inherited create(true);
      FmyPool:=pool;
      FreeOnTerminate:=true;
      resume;
    end;

procedure TpoolThread.execute;
var thisTask:TpooledTask;
begin
  while (WAIT_OBJECT_0=waitForSingleObject(FmyPool.taskCounter,INFINITE)) do
  begin
    FmyPool.access.acquire;
    try
      thisTask:=TpooledTask(FmyPool.pop);
    finally
      FmyPool.access.release;
    end;
    thisTask.execute;
    if assigned(thisTask.FonComplete) then thisTask.FonComplete(thisTask);
  end;
end;

end.
Run Code Online (Sandbox Code Playgroud)


NGL*_*GLN 3

关于活动线程减少的问题:抱歉,但您只需自己决定。要么立即释放不需要的线程(尽早终止它们),要么让它们运行直到完成(所有工作完成后终止它们)。这是你的选择。当然,您必须将所需数量的变量与实际线程数的变量分开。更新线程变量的实际数量(可能只是 List.Count)的问题对于两者来说完全相同,因为任一解决方案都需要一些时间。

关于多线程的管理:您可以研究这个答案,它将线程存储在 TList 中。不过,它需要根据您的具体愿望清单进行一些调整,如果需要帮助,请大声喊叫。此外,当然还有更多可能的实现可以从默认 TThread 的使用中派生出来。请注意,存在其他(多)线程库,但我从未需要使用它们。

  • FreeOnTerminate:=true。在任务队列上传递毒丸任务。没有标志,没有 TThread.WaitFor,不需要访问 TThread 实例,因此不需要线程列表,因此不需要线程列表管理,因此消除了大多数狡猾的死锁生成代码。 (2认同)