iam*_*osy 8 delphi parallel-processing multithreading delphi-xe7
鉴于以下在一维数组中查找奇数的简单任务:
begin
odds := 0;
Ticks := TThread.GetTickCount;
for i := 0 to MaxArr-1 do
if ArrXY[i] mod 2 = 0 then
Inc(odds);
Ticks := TThread.GetTickCount - Ticks;
writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
Run Code Online (Sandbox Code Playgroud)
看起来这将是并行处理的一个很好的候选者.因此可能会想要使用以下TParallel.For版本:
begin
odds := 0;
Ticks := TThread.GetTickCount;
TParallel.For(0, MaxArr-1, procedure(I:Integer)
begin
if ArrXY[i] mod 2 = 0 then
inc(odds);
end);
Ticks := TThread.GetTickCount - Ticks;
writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
Run Code Online (Sandbox Code Playgroud)
这种并行计算的结果在两个方面有点令人惊讶:
计算的赔率数是错误的
执行时间比串行版本长
1)可以解释,因为我们没有保护并发访问的odds变量.所以为了解决这个问题,我们应该TInterlocked.Increment(odds);
改为使用 .
2)也可以解释:它表现出虚假分享的影响.
理想情况下,错误共享问题的解决方案是使用局部变量来存储中间结果,并且仅在所有并行任务结束时总结这些中间人.这是我真正的问题,我无法理解:有没有办法让局部变量进入我的匿名方法?注意,简单地在匿名方法体中声明局部变量是行不通的,因为每次迭代都会调用匿名方法体.如果这在某种程度上是可行的,那么有没有办法在每个任务迭代结束时从匿名方法中得到我的中间结果?
编辑:我实际上并不是真的对计算赔率或者埃文斯感兴趣.我只用这个来证明效果.
为了完整性原因,这里有一个控制台应用程序演示效果:
program Project4;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils, System.Threading, System.Classes, System.SyncObjs;
const
MaxArr = 100000000;
var
Ticks: Cardinal;
i: Integer;
odds: Integer;
ArrXY: array of Integer;
procedure FillArray;
var
i: Integer;
j: Integer;
begin
SetLength(ArrXY, MaxArr);
for i := 0 to MaxArr-1 do
ArrXY[i]:=Random(MaxInt);
end;
procedure Parallel;
begin
odds := 0;
Ticks := TThread.GetTickCount;
TParallel.For(0, MaxArr-1, procedure(I:Integer)
begin
if ArrXY[i] mod 2 = 0 then
TInterlocked.Increment(odds);
end);
Ticks := TThread.GetTickCount - Ticks;
writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
procedure ParallelFalseResult;
begin
odds := 0;
Ticks := TThread.GetTickCount;
TParallel.For(0, MaxArr-1, procedure(I:Integer)
begin
if ArrXY[i] mod 2 = 0 then
inc(odds);
end);
Ticks := TThread.GetTickCount - Ticks;
writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
procedure Serial;
begin
odds := 0;
Ticks := TThread.GetTickCount;
for i := 0 to MaxArr-1 do
if ArrXY[i] mod 2 = 0 then
Inc(odds);
Ticks := TThread.GetTickCount - Ticks;
writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
begin
try
FillArray;
Serial;
ParallelFalseResult;
Parallel;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
Readln;
end.
Run Code Online (Sandbox Code Playgroud)
Ste*_*nke 11
解决此问题的关键是尽可能少地正确分区和共享.
使用此代码,它的运行速度几乎是串行代码的4倍.
const
WorkerCount = 4;
function GetWorker(index: Integer; const oddsArr: TArray<Integer>): TProc;
var
min, max: Integer;
begin
min := MaxArr div WorkerCount * index;
if index + 1 < WorkerCount then
max := MaxArr div WorkerCount * (index + 1) - 1
else
max := MaxArr - 1;
Result :=
procedure
var
i: Integer;
odds: Integer;
begin
odds := 0;
for i := min to max do
if Odd(ArrXY[i]) then
Inc(odds);
oddsArr[index] := odds;
end;
end;
procedure Parallel;
var
i: Integer;
oddsArr: TArray<Integer>;
workers: TArray<ITask>;
begin
odds := 0;
Ticks := TThread.GetTickCount;
SetLength(oddsArr, WorkerCount);
SetLength(workers, WorkerCount);
for i := 0 to WorkerCount-1 do
workers[i] := TTask.Run(GetWorker(i, oddsArr));
TTask.WaitForAll(workers);
for i := 0 to WorkerCount-1 do
Inc(odds, oddsArr[i]);
Ticks := TThread.GetTickCount - Ticks;
writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;
Run Code Online (Sandbox Code Playgroud)
您可以使用TParallel.For编写类似的代码,但它仍然比使用TTask运行得慢一点(比串行快3倍).
顺便说一下,我使用该函数返回工作者TProc以使索引捕获正确.如果在同一例程中循环运行它,则捕获循环变量.
2014年12月19日更新:
由于我们发现关键是正确的分区,因此可以非常容易地将其放入并行for循环中,而无需将其锁定在特定的数据结构上:
procedure ParallelFor(lowInclusive, highInclusive: Integer;
const iteratorRangeEvent: TProc<Integer, Integer>);
procedure CalcPartBounds(low, high, count, index: Integer;
out min, max: Integer);
var
len: Integer;
begin
len := high - low + 1;
min := (len div count) * index;
if index + 1 < count then
max := len div count * (index + 1) - 1
else
max := len - 1;
end;
function GetWorker(const iteratorRangeEvent: TProc<Integer, Integer>;
min, max: Integer): ITask;
begin
Result := TTask.Run(
procedure
begin
iteratorRangeEvent(min, max);
end)
end;
var
workerCount: Integer;
workers: TArray<ITask>;
i, min, max: Integer;
begin
workerCount := TThread.ProcessorCount;
SetLength(workers, workerCount);
for i := 0 to workerCount - 1 do
begin
CalcPartBounds(lowInclusive, highInclusive, workerCount, i, min, max);
workers[i] := GetWorker(iteratorRangeEvent, min, max);
end;
TTask.WaitForAll(workers);
end;
procedure Parallel4;
begin
odds := 0;
Ticks := TThread.GetTickCount;
ParallelFor(0, MaxArr-1,
procedure(min, max: Integer)
var
i, n: Integer;
begin
n := 0;
for i := min to max do
if Odd(ArrXY[i]) then
Inc(n);
AtomicIncrement(odds, n);
end);
Ticks := TThread.GetTickCount - Ticks;
writeln('ParallelEx: Stefan Glienke ' + Ticks.ToString + ' ms, odds: ' + odds.ToString);
end;
Run Code Online (Sandbox Code Playgroud)
关键是使用局部变量进行计数,并且最后只使用共享变量一次来添加子总数.
使用 SVN 中的 OmniThreadLibrary(尚未包含在任何正式版本中),您可以以不需要对共享计数器进行互锁访问的方式编写此代码。
function CountParallelOTL: integer;
var
counters: array of integer;
numCores: integer;
i: integer;
begin
numCores := Environment.Process.Affinity.Count;
SetLength(counters, numCores);
FillChar(counters[0], Length(counters) * SizeOf(counters[0]), 0);
Parallel.For(0, MaxArr - 1)
.NumTasks(numCores)
.Execute(
procedure(taskIndex, value: integer)
begin
if Odd(ArrXY[value]) then
Inc(counters[taskIndex]);
end);
Result := counters[0];
for i := 1 to numCores - 1 do
Inc(Result, counters[i]);
end;
Run Code Online (Sandbox Code Playgroud)
然而,这在最好的情况下仍然与顺序循环相当,在最坏的情况下慢了几倍。
我将此与 Stefan 的解决方案(XE7 任务)以及带有互锁增量的简单 XE7 Parallel.For 进行了比较(XE7 for)。
我的笔记本电脑具有 4 个超线程核心的结果:
序列号:543 毫秒内找到 49999640 个奇数元素
并行 (OTL):555 毫秒内找到 49999640 个奇数元素
并行(XE7 任务):136 毫秒内找到 49999640 个奇数元素
并行(XE7):1667 毫秒内找到 49999640 个奇数元素
我的具有 12 个超线程核心的工作站的结果:
序列号:685 毫秒内发现 50005291 个奇数元素
并行 (OTL):1309 毫秒内发现 50005291 个奇数元素
并行(XE7 任务):62 毫秒内发现 50005291 个奇数元素
并行(XE7):在 3379 毫秒内找到 50005291 个奇数元素
与 System.Threading Parall.For 相比有很大的改进,因为没有互锁增量,但手工解决方案要快得多。
完整测试程序:
program ParallelCount;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SyncObjs,
System.Classes,
System.SysUtils,
System.Threading,
DSiWin32,
OtlCommon,
OtlParallel;
const
MaxArr = 100000000;
var
Ticks: Cardinal;
i: Integer;
odds: Integer;
ArrXY: array of Integer;
procedure FillArray;
var
i: Integer;
j: Integer;
begin
SetLength(ArrXY, MaxArr);
for i := 0 to MaxArr-1 do
ArrXY[i]:=Random(MaxInt);
end;
function CountSerial: integer;
var
odds: integer;
begin
odds := 0;
for i := 0 to MaxArr-1 do
if Odd(ArrXY[i]) then
Inc(odds);
Result := odds;
end;
function CountParallelOTL: integer;
var
counters: array of integer;
numCores: integer;
i: integer;
begin
numCores := Environment.Process.Affinity.Count;
SetLength(counters, numCores);
FillChar(counters[0], Length(counters) * SizeOf(counters[0]), 0);
Parallel.For(0, MaxArr - 1)
.NumTasks(numCores)
.Execute(
procedure(taskIndex, value: integer)
begin
if Odd(ArrXY[value]) then
Inc(counters[taskIndex]);
end);
Result := counters[0];
for i := 1 to numCores - 1 do
Inc(Result, counters[i]);
end;
function GetWorker(index: Integer; const oddsArr: TArray<Integer>; workerCount: integer): TProc;
var
min, max: Integer;
begin
min := MaxArr div workerCount * index;
if index + 1 < workerCount then
max := MaxArr div workerCount * (index + 1) - 1
else
max := MaxArr - 1;
Result :=
procedure
var
i: Integer;
odds: Integer;
begin
odds := 0;
for i := min to max do
if Odd(ArrXY[i]) then
Inc(odds);
oddsArr[index] := odds;
end;
end;
function CountParallelXE7Tasks: integer;
var
i: Integer;
oddsArr: TArray<Integer>;
workers: TArray<ITask>;
workerCount: integer;
begin
workerCount := Environment.Process.Affinity.Count;
odds := 0;
Ticks := TThread.GetTickCount;
SetLength(oddsArr, workerCount);
SetLength(workers, workerCount);
for i := 0 to workerCount-1 do
workers[i] := TTask.Run(GetWorker(i, oddsArr, workerCount));
TTask.WaitForAll(workers);
for i := 0 to workerCount-1 do
Inc(odds, oddsArr[i]);
Result := odds;
end;
function CountParallelXE7For: integer;
var
odds: integer;
begin
odds := 0;
TParallel.For(0, MaxArr-1, procedure(I:Integer)
begin
if Odd(ArrXY[i]) then
TInterlocked.Increment(odds);
end);
Result := odds;
end;
procedure Count(const name: string; func: TFunc<integer>);
var
time: int64;
cnt: integer;
begin
time := DSiTimeGetTime64;
cnt := func();
time := DSiElapsedTime64(time);
Writeln(name, ': ', cnt, ' odd elements found in ', time, ' ms');
end;
begin
try
FillArray;
Count('Serial', CountSerial);
Count('Parallel (OTL)', CountParallelOTL);
Count('Parallel (XE7 tasks)', CountParallelXE7Tasks);
Count('Parallel (XE7 for)', CountParallelXE7For);
Readln;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
end.
Run Code Online (Sandbox Code Playgroud)