是否有PLINQ的异步版本?

usr*_*usr 14 .net asynchronous plinq task-parallel-library

我想在一定程度的并行处理并行处理项目的同时对数据流执行查询.通常情况下,我会使用PLINQ,但我的工作项不是CPU绑定的,而是IO绑定的.我想使用异步IO.PLINQ不支持异步工作.

运行PLINQ样式查询的最聪明方法是什么,但使用异步工作项?


以下是该问题的更详细说明:

我的目标是以下面的查询逻辑描述的方式处理可能无限的"项目"流:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);
Run Code Online (Sandbox Code Playgroud)

此查询只是真实查询的草图.现在我希望每个占位符函数都是异步的(返回a Task和内部基于异步IO).

请注意,可能存在的内容远远多于可以存储在内存中的项目.我还必须控制并行度以最大化底层网络和磁盘硬件.

这个问题不是关于多核的.它完全适用于只有一个CPU内核的机器,因为IO仍然可以从并行性中受益.想想慢速的Web服务调用等.

Eni*_*ity 6

这听起来像是微软反应框架的工作.

我开始使用此代码作为我的初始变量:

var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};
Run Code Online (Sandbox Code Playgroud)

现在,我使用常规LINQ查询作为基线:

var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);
Run Code Online (Sandbox Code Playgroud)

这需要50秒来计算以下结果:

枚举

然后我切换到一个可观察的(反应式框架)查询:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;
Run Code Online (Sandbox Code Playgroud)

获得以下内容需要10秒钟:

可观察

它显然是并行计算的.

但是,结果出了问题.所以我将查询更改为:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);
Run Code Online (Sandbox Code Playgroud)

那仍然需要10秒钟,但我以正确的顺序得到了结果.

现在,这里唯一的问题是WithDegreeOfParallelism.这里有一些东西可以尝试.

首先,我将代码更改为生成10,000个值,计算时间为10ms.我的标准LINQ查询仍然需要50秒.但反应性查询耗时6.3秒.如果它可以同时执行所有计算,它应该花费更少.这表明它正在最大化异步管道.

第二点是反应式框架使用调度程序进行所有工作调度.您可以尝试使用反应式框架附带的各种调度程序,以便在内置程序不能满足您的需求时找到替代方案.或者您甚至可以编写自己的调度程序来执行您喜欢的任何调度.


这是查询的一个版本,它也可以并行计算谓词.

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };
Run Code Online (Sandbox Code Playgroud)


tur*_*ula 1

如此处所述,PLINQ用于在多核/多处理器系统上并行运行LINQ 查询。对于拥有大量磁盘单元和超级网络功能的酷系统来说,这并没有太多作用。AFAIK,它是为了在更多内核上运行可执行代码而设计的,而不是为了同时向操作系统分派多个 I/O 请求。

也许您的Predicate(x)受 CPU 限制,因此您可以使用 PLINQ 执行该过滤操作。但是您不能以相同的方式应用 I/O 要求较高的操作(ComputeSomeValuePerformSomeAction )。

您可以做的是为每个项目定义一个操作链(在您的情况下有两个)(请参阅延续任务)并分派该链(按顺序(?))。

另外,您还提到了有关“无限的项目流”的内容。这听起来有点像生产者-消费者问题——如果这些项也是 I/O 生成的。

也许你的问题不是多核友好...它可能只是 I/O 要求高,仅此而已...

  • CPU 负载并不显着。但我确实需要异步 IO 和非常高的并行度来最大化 IO。这个问题完全适用于具有一个 CPU 核心的机器,因为 IO 完成将被复用到一个核心上。我可以定义一个“操作员链”,但项目太多,无法立即启动所有工作。我需要限制/保证的并行度。 (2认同)