如何从新的线程库中使用TTask.WaitForAny?

Jim*_*ean 7 delphi multithreading delphi-xe7 rtl-ppl

为了尝试使用Delphi中的线程库并行计算任务并使用TTask.WaitForAny()获取第一个计算结果,异常暂时停止了执行.

在例外情况下调用堆栈:

$ 752D2F71的首次机会异常.异常类EMonitorLockException,消息'Object lock not owned'.处理Project1.exe(11248)

:752d2f71 KERNELBASE.RaiseException + 0x48
System.TMonitor.CheckOwningThread
System.ErrorAt(25,$408C70)
System.Error(reMonitorNotLocked)
System.TMonitor.CheckOwningThread
System.TMonitor.Exit
System.TMonitor.Exit($2180E40)
System.Threading.TTask.RemoveCompleteEvent(???)
System.Threading.TTask.DoWaitForAny((...),4294967295)
System.Threading.TTask.WaitForAny((...))
Project9.Parallel2
Project9.Project1
:74ff919f KERNEL32.BaseThreadInitThunk + 0xe
:7723b54f ntdll.RtlInitializeExceptionChain + 0x8f
:7723b51a ntdll.RtlInitializeExceptionChain + 0x5a
Run Code Online (Sandbox Code Playgroud)

调用堆栈得出的结论是异常是由线程库中的错误引起的,TMonitor和/或TTask.WaitForAny().为了验证这一点,代码被减少到最低限度:

program Project1;

{$APPTYPE CONSOLE}

uses
  System.SysUtils, System.Threading, System.Classes, System.SyncObjs,
  System.StrUtils;
var
  WorkerCount : integer = 1000;

function MyTaskProc: TProc;
begin
  result := procedure
    begin
      // Do something
    end;
end;

procedure Parallel2;
var
  i : Integer;
  Ticks: Cardinal;
  tasks: array of ITask;
  LTask: ITask;
  workProc: TProc;
begin
  workProc := MyTaskProc();
  Ticks := TThread.GetTickCount;
  SetLength(tasks, WorkerCount); // number of parallel tasks to undertake
  for i := 0 to WorkerCount - 1 do // parallel tasks
    tasks[i] := TTask.Run(workProc);
  TTask.WaitForAny(tasks); // wait for the first one to finish
  for LTask in tasks do
    LTask.Cancel; // kill the remaining tasks
  Ticks := TThread.GetTickCount - Ticks;
  WriteLn('Parallel time ' + Ticks.ToString + ' ms');
end;

begin
  try
    repeat
      Parallel2;
      WriteLn('finished');
    until FALSE;
  except
    on E: Exception do
      writeln(E.ClassName, ': ', E.Message);
  end;
  Readln;
end.
Run Code Online (Sandbox Code Playgroud)

现在错误会在一段时间后重现,并且验证了RTL错误.

这是作为RSP-10197 TTask.WaitForAny提交给Embarcadero的异常EMonitorLockException"Object lock not owned".


鉴于目前使用Delphi线程库无法解决这个问题,问题是:

是否有解决方法并行执行过程以获得第一个获得的解决方案?

LU *_* RD 4

下面是一个使用TParallel.For在产生答案时停止执行的示例。它使用TParallel.LoopState向并行 for 循环的其他成员发出信号。通过使用该.Stop信号,所有当前和待处理的迭代都应该停止。当前迭代应该检查loopState.Stopped.

procedure Parallel3(CS: TCriticalSection);
var
  Ticks: Cardinal;
  i,ix: Integer;  // variables that are only touched once in the Parallel.For loop
begin
  i := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(1,WorkerCount,
    procedure(index:Integer; loopState: TParallel.TLoopState)
    var
      k,l,m: Integer;
    begin
      // Do something complex
      k := (1000 - index)*1000;
      for l := 0 to Pred(k) do
        m := k div 1000;
      // If criteria to stop fulfilled:
      CS.Enter;
      Try
        if loopState.Stopped then // A solution was already found
          Exit;
        loopState.Stop;  // Signal 
        Inc(i);
        ix := index;
      Finally
        CS.Leave;
      End;
    end
  );
  Ticks := TThread.GetTickCount - Ticks;
  WriteLn('Parallel time ' + Ticks.ToString + ' ticks', ' i :',i,' index:',ix);
end;
Run Code Online (Sandbox Code Playgroud)

临界区保护计算结果,这里为简单起见 i,ix。


免责声明,鉴于库中存在大量错误System.Threading,我会推荐使用 OTL 框架的另一种解决方案。至少在图书馆打下稳定的基础之前是这样。