usr*_*usr 14 .net asynchronous plinq task-parallel-library
我想在一定程度的并行处理并行处理项目的同时对数据流执行查询.通常情况下,我会使用PLINQ,但我的工作项不是CPU绑定的,而是IO绑定的.我想使用异步IO.PLINQ不支持异步工作.
运行PLINQ样式查询的最聪明方法是什么,但使用异步工作项?
以下是该问题的更详细说明:
我的目标是以下面的查询逻辑描述的方式处理可能无限的"项目"流:
var items = new int[10]; //simulate data
var results =
from x in items.AsParallel().WithDegreeOfParallelism(100)
where Predicate(x)
select ComputeSomeValue(x);
foreach (var result in results)
PerformSomeAction(result);
Run Code Online (Sandbox Code Playgroud)
此查询只是真实查询的草图.现在我希望每个占位符函数都是异步的(返回a Task和内部基于异步IO).
请注意,可能存在的内容远远多于可以存储在内存中的项目.我还必须控制并行度以最大化底层网络和磁盘硬件.
这个问题不是关于多核的.它完全适用于只有一个CPU内核的机器,因为IO仍然可以从并行性中受益.想想慢速的Web服务调用等.
这听起来像是微软反应框架的工作.
我开始使用此代码作为我的初始变量:
var items = Enumerable.Range(0, 10).ToArray();
Func<int, bool> Predicate = x => x % 2 == 0;
Func<int, int> ComputeSomeValue = x =>
{
Thread.Sleep(10000);
return x * 3;
};
Run Code Online (Sandbox Code Playgroud)
现在,我使用常规LINQ查询作为基线:
var results =
from x in items
where Predicate(x)
select ComputeSomeValue(x);
Run Code Online (Sandbox Code Playgroud)
这需要50秒来计算以下结果:

然后我切换到一个可观察的(反应式框架)查询:
var results =
from x in items.ToObservable()
where Predicate(x)
from y in Observable.Start(() => ComputeSomeValue(x))
select y;
Run Code Online (Sandbox Code Playgroud)
获得以下内容需要10秒钟:

它显然是并行计算的.
但是,结果出了问题.所以我将查询更改为:
var query =
from x in items.ToObservable()
where Predicate(x)
from y in Observable.Start(() => ComputeSomeValue(x))
select new { x, y };
var results =
query
.ToEnumerable()
.OrderBy(z => z.x)
.Select(z => z.y);
Run Code Online (Sandbox Code Playgroud)
那仍然需要10秒钟,但我以正确的顺序得到了结果.
现在,这里唯一的问题是WithDegreeOfParallelism.这里有一些东西可以尝试.
首先,我将代码更改为生成10,000个值,计算时间为10ms.我的标准LINQ查询仍然需要50秒.但反应性查询耗时6.3秒.如果它可以同时执行所有计算,它应该花费更少.这表明它正在最大化异步管道.
第二点是反应式框架使用调度程序进行所有工作调度.您可以尝试使用反应式框架附带的各种调度程序,以便在内置程序不能满足您的需求时找到替代方案.或者您甚至可以编写自己的调度程序来执行您喜欢的任何调度.
这是查询的一个版本,它也可以并行计算谓词.
var results =
from x in items.ToObservable()
from p in Observable.Start(() => Predicate(x))
where p
from y in Observable.Start(() => ComputeSomeValue(x))
select new { x, y };
Run Code Online (Sandbox Code Playgroud)
如此处所述,PLINQ用于在多核/多处理器系统上并行运行LINQ 查询。对于拥有大量磁盘单元和超级网络功能的酷系统来说,这并没有太多作用。AFAIK,它是为了在更多内核上运行可执行代码而设计的,而不是为了同时向操作系统分派多个 I/O 请求。
也许您的Predicate(x)受 CPU 限制,因此您可以使用 PLINQ 执行该过滤操作。但是您不能以相同的方式应用 I/O 要求较高的操作(ComputeSomeValue、PerformSomeAction )。
您可以做的是为每个项目定义一个操作链(在您的情况下有两个)(请参阅延续任务)并分派该链(按顺序(?))。
另外,您还提到了有关“无限的项目流”的内容。这听起来有点像生产者-消费者问题——如果这些项也是 I/O 生成的。
也许你的问题不是多核友好...它可能只是 I/O 要求高,仅此而已...
| 归档时间: |
|
| 查看次数: |
1027 次 |
| 最近记录: |