标签: plinq

AsParallel究竟是如何工作的?

以下测试程序似乎没有下蹲.这是因为我正在测试一个小清单吗?

static void Main(string[] args)
{
    List<int> list = 0.UpTo(4);

    Test(list.AsParallel());
    Test(list);
}

private static void Test(IEnumerable<int> input)
{
    var timer = new Stopwatch();
    timer.Start();
    var size = input.Count();
    if (input.Where(IsOdd).Count() != size / 2)
        throw new Exception("Failed to count the odds");

    timer.Stop();
    Console.WriteLine("Tested " + size + " numbers in " + timer.Elapsed.TotalSeconds + " seconds");
}

private static bool IsOdd(int n)
{
    Thread.Sleep(1000);
    return n%2 == 1;
}
Run Code Online (Sandbox Code Playgroud)

两个版本都需要4秒才能运行.

.net c# plinq

7
推荐指数
2
解决办法
1万
查看次数

为什么PLINQ只使用两个线程?

假设我有一个IO绑定任务.我使用WithDegreeOfParallelism = 10和WithExecution = ForceParallelism模式,但查询仍然只使用两个线程.为什么?

我知道PLINQ通常会选择一个与我的核心数相等的并行度,但为什么它忽略了我对更高并行性的特定要求呢?

static void Main(string[] args)
{
    TestParallel(0.UpTo(8));
}

private static void TestParallel(IEnumerable<int> input)
{
    var timer = new Stopwatch();
    timer.Start();
    var size = input.Count();

    if (input.AsParallel().
        WithDegreeOfParallelism(10).
        WithExecutionMode(ParallelExecutionMode.ForceParallelism).
        Where(IsOdd).Count() != size / 2)
        throw new Exception("Failed to count the odds");

    timer.Stop();
    Console.WriteLine("Tested " + size + " numbers in " + timer.Elapsed.TotalSeconds + " seconds");
}

private static bool IsOdd(int n)
{
    Thread.Sleep(1000);
    return n%2 == 1;
}
Run Code Online (Sandbox Code Playgroud)

.net c# plinq

7
推荐指数
1
解决办法
4283
查看次数

使用PLINQ返回null

我有一个IEnumerable的扩展方法,然后迭代集合,做它的业务,然后返回一个新的IEnumerable.

我试图使用.AsParallel().ForAll()来显着加速迭代(当然它应该这样做),但是当返回集合时,该集合中通常有一些对象是null.

我假设这可能是因为它在所有'业务'有机会完成之前返回集合?如果我调试并放入断点,则没有空值.

我应该使用某种"等待这个操作完成"的方法吗?

编辑:要更清楚一点,在forall中有业务逻辑,修改属性等.有必要循环一个动作,而不是简单地选择一些东西.

c# plinq

7
推荐指数
1
解决办法
521
查看次数

PLinq本质上比System.Threading.Tasks.Parallel.ForEach更快

简介:我从System.Threading.Tasks.Parallel.ForEach和Concurrent Data结构更改为简单的plinq(Parallel Linq)查询.加速是惊人的.

那么plinq固有地比Parallel.ForEach快吗?或者它是否特定于任务.

// Original Code
// concurrent dictionary to store results
var resultDict = new ConcurrentDictionary<string, MyResultType>();

Parallel.ForEach(items, item =>
        {
            resultDict.TryAdd(item.Name, PerformWork(source));
        });


// new code

var results =
            items
            .AsParallel()
            .Select(item => new { item.Name, queryResult = PerformWork(item) })
            .ToDictionary(kv => kv.SourceName, kv => kv.queryResult);
Run Code Online (Sandbox Code Playgroud)

注意:每个任务(PerformWork)现在运行0到200毫秒.在我优化它之前,它需要更长的时间.这就是我在第一时间使用Tasks.Parallel库的原因.所以我从总时间的2秒到大约100-200毫秒的总时间,执行大致相同的工作,只是使用不同的方法.(哇linq和plinq太棒了!)

问题:

  1. 是否因为使用plinq vs Parallel.ForEach而加速?
  2. 它只是简单地删除并发数据结构(ConcurrentDictionary)?(因为它不需要同步线程).
  3. 根据这个相关问题的答案

PLINQ主要基于功能风格的编程而没有副作用,而副作用正是TPL的用途.如果你想实际并行工作而不是仅仅并行搜索/选择事物,你可以使用TPL.

我可以假设,因为我的模式基本上是功能性的(给输入产生没有突变的新输出),plinq是正确使用的技术吗?

我正在寻找验证我的假设是正确的,或者表明我错过了什么.

c# linq concurrency plinq task-parallel-library

7
推荐指数
1
解决办法
3456
查看次数

利用PLINQ和自定义Enumerable Extensions

许多自定义Enumerable扩展可以根据其他内置操作实现 - 例如这个简单的方便方法:

public static bool AnyOf<TElement>(this TElement item, IEnumerable<TElement> items)
{
    return items.Any(a => EqualityComparer<TElement>.Default.Equals(a, item));
}
Run Code Online (Sandbox Code Playgroud)

现在这将强制任何PLINQ查询回到顺序操作,即使PLINQ也有一个Any - 并且只相当于一个签名更改:

public static bool AnyOf<T>(this T item, ParallelQuery<T> items)
{
    return items.Any(a => EqualityComparer<T>.Default.Equals(a, item));
}
Run Code Online (Sandbox Code Playgroud)

但是像这样复制它对我来说似乎很麻烦.

起初我认为类似下面的东西可能会起作用,但当然它不会^因为扩展方法是静态方法,因此决定调用Enumerable.Any而不是ParallelQuery.Any基于签名在编译时进行.

public static bool AnyOf<TElement, TEnumerable>(this TElement item, TEnumerable items)
    where TEnumerable : class, IEnumerable<TElement>
{
    return items.Any(a => EqualityComparer<TElement>.Default.Equals(a, item));
}
Run Code Online (Sandbox Code Playgroud)

我得出的结论是,如果没有使用不同的签名创建每个方法的副本,那是不可能的,但也许我错过了一些东西.(啧啧总是带着不可能的问题!)


或许可以从并行化(显然可以链接等)中受益的帮助器的更好示例是这样的.

public static IEnumerable<string> ToStrings(this IEnumerable<object> ienum)
{
    return ienum.Select(a=> a.ToString());
}
Run Code Online (Sandbox Code Playgroud)

^编译器错误:

 The type 'ParallelQuery<TElement>' …
Run Code Online (Sandbox Code Playgroud)

c# linq plinq

7
推荐指数
1
解决办法
305
查看次数

AsParallel()和Any()?

我已经看过这个使用AsParallel()和检查条件的代码Any() :

bool IsAnyDeviceConnected()
{
   return m_devices.Any(d => d.IsConnected);
}
Run Code Online (Sandbox Code Playgroud)

并使其更快:

bool IsAnyDeviceConnected()
{
   return m_devices.AsParallel().Any(d => d.IsConnected);
}
Run Code Online (Sandbox Code Playgroud)

但看看Any():

 internal static bool Any<T>(this IEnumerable<T> source, Func<T, bool> predicate) {
            foreach (T element in source) {
                if (predicate(element)) {
                    return true;
                }
            }
            return false;
        }
Run Code Online (Sandbox Code Playgroud)

我没有看到(显然) - 它确实关心取消其他工人 - 一旦找到.

但是 - 这个(其他)代码确实"尽快完成"+取消其他未来的工作:

bool IsAnyDeviceConnected()
{
   var res = Parallel.ForEach(m_devices,
      (d,loopState) => {  
         if (d.IsConnected) 
            loopState.Stop();
      });
   return …
Run Code Online (Sandbox Code Playgroud)

c# .net-4.0 plinq

7
推荐指数
3
解决办法
1201
查看次数

如何有效地使用大数据集中LINQ并行中的子句或选择

我有大约250,000条记录标记为Boss,每个Boss有2到10名职员.我每天都需要了解员工的详细信息.大约有1,000,000名员工.我正在使用Linq获取每日工作人员的唯一列表.请考虑以下C#LINQ和模型

void Main()
{

    List<Boss> BossList = new List<Boss>()
    {
        new Boss()
        {
            EmpID = 101,
            Name = "Harry",
            Department = "Development",
            Gender = "Male",
            Employees = new List<Person>()
            {
                new Person() {EmpID = 102, Name = "Peter", Department = "Development",Gender = "Male"},
                new Person() {EmpID = 103, Name = "Emma Watson", Department = "Development",Gender = "Female"},

            }
        },
        new Boss()
        {
            EmpID = 104,
            Name = "Raj",
            Department = "Development",
            Gender = "Male",
            Employees = new List<Person>()
                    {
                        new …
Run Code Online (Sandbox Code Playgroud)

c# linq plinq

7
推荐指数
1
解决办法
608
查看次数

Plinq给出了Linq不同的结果 - 我做错了什么?

谁能告诉我Plinq的正确代码是什么?我将双数组中每个元素的正弦的绝对值的平方根加起来,但是Plinq给了我错误的结果.

该计划的输出是:

Linq聚合= 75.8310477905274(正确)Plinq聚合= 38.0263653589291(大约应该是它的一半)

我一定是做错了什么,但我找不到什么......

(我在Core 2 Duo Windows 7 x64 PC上运行Visual Studio 2008.)

这是代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main()
        {
            double[] array = new double[100];

            for (int i = 0; i < array.Length; ++i)
            {
                array[i] = i;
            }

            double sum1 = array.Aggregate((total, current) => total + Math.Sqrt(Math.Abs(Math.Sin(current))));
            Console.WriteLine("Linq aggregate = " + sum1);

            IParallelEnumerable<double> parray = array.AsParallel<double>();
            double sum2 = parray.Aggregate((total, current) => …
Run Code Online (Sandbox Code Playgroud)

c# linq plinq

6
推荐指数
1
解决办法
714
查看次数

TPL中的最大任务?

我想在Windows Azure上的Worker进程中使用TPL.我想在队列中添加一个IJob,这有一个Run方法,因此worker将包含:

loop get item off queue使用TPL调用IJob.Run,​​这是一个异步调用

但我有点担心我可以添加到TPL的最大项目?如果需要,我很乐意构建我自己的某种TPL池,只需检查它的功能.

干杯,灰.

multithreading worker-process azure plinq task-parallel-library

6
推荐指数
1
解决办法
2250
查看次数

试图在长时间运行的发电机上使用PLINQ的陷阱?

我有一些无限的生成器方法,包括一些长时间运行和无限长时间运行的生成器.

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}
Run Code Online (Sandbox Code Playgroud)

我的目标是拥有一个无限序列,它结合了两个例子中的项目.这是我尝试过的,使用PLINQ:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());
Run Code Online (Sandbox Code Playgroud)

这似乎恰当地将两个IEnumerables组合成一个新的,IEnumerable只要#1和#2中的项目出现在两个源中的任何一个中,它们就可用IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one …
Run Code Online (Sandbox Code Playgroud)

c# plinq system.reactive

6
推荐指数
1
解决办法
407
查看次数