标签: plinq

在多核机器上进行.NET操作的非线性扩展

我在.NET应用程序中遇到了一种奇怪的行为,它对一组内存数据执行一些高度并行的处理.

当在多核处理器(IntelCore2 Quad Q6600 2.4GHz)上运行时,它会展示非线性缩放,因为多个线程被启动以处理数据.

当作为单核上的非多线程循环运行时,该过程能够每秒完成大约240万次计算.当作为四个线程运行时,您可以预期吞吐量的四倍 - 在每秒900万次计算的某个地方 - 但是,唉,没有.在实践中,它每秒仅完成约4.1百万......与预期的吞吐量相当短.

此外,无论我使用PLINQ,线程池还是四个显式创建的线程,都会发生这种情况.很奇怪...

使用CPU时间没有其他任何东西在机器上运行,计算中也没有任何锁或其他同步对象......它应该只是在数据中前进.我已经通过在进程运行时查看perfmon数据来确认这一点(尽可能)...并且没有报告的线程争用或垃圾收集活动.

我的理论目前:

  1. 所有技术(线程上下文切换等)的开销都压倒了计算
  2. 线程没有被分配到四个核心中的每一个并且花费一些时间在同一个处理器核心上等待...不确定如何测试这个理论......
  3. .NET CLR线程未按预期优先级运行或具有一些隐藏的内部开销.

以下是代码中应该表现出相同行为的代表性摘录:

    var evaluator = new LookupBasedEvaluator();

    // find all ten-vertex polygons that are a subset of the set of points
    var ssg = new SubsetGenerator<PolygonData>(Points.All, 10);

    const int TEST_SIZE = 10000000;  // evaluate the first 10 million records

    // materialize the data into memory...
    var polygons = ssg.AsParallel()
                      .Take(TEST_SIZE)
                      .Cast<PolygonData>()
                      .ToArray();

    var sw1 = Stopwatch.StartNew();
    // for loop completes in about 4.02 seconds... …
Run Code Online (Sandbox Code Playgroud)

c# linq parallel-processing performance plinq

8
推荐指数
2
解决办法
893
查看次数

相当于并行中的 do() while{}

如何在下面的方法中创建与do-while或类似的并行等效项Update()

TestBuffer应用程序中的另一个线程随机写入。TestBuffer.RemoveItemAndDoSomethingWithIt();应该运行直到为TestBuffer空。目前Update()仅运行枚举时集合中的项目,这是有道理的。

internal class UnOrderedBuffer<T> where T : class
{
    ConcurrentBag<T> GenericBag = new ConcurrentBag<T>();
}

internal class Tester
{
    private UnOrderedBuffer<Data> TestBuffer;

    public void Update()
    {
        Parallel.ForEach(TestBuffer, Item =>
        {
            TestBuffer.RemoveItemAndDoSomethingWithIt();
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing plinq

8
推荐指数
2
解决办法
1万
查看次数

使用PLINQ Extensions时是否传输了线程标识?

我正在使用.AsParallel().ForAll()在ASP.NET请求的上下文中并行枚举一个集合.枚举方法依赖于System.Threading.Thread.CurrentPrincipal.

我可以依赖于用于将System.Threading.Thread.CurrentPrincipal设置为处理ASP.NET请求的线程的HttpContext.Current.User的单个线程,还是我需要自己管理?

问这个问题的另一种方法是PLINQ使用的线程是否继承了调用该操作的线程的标识?

c# asp.net multithreading plinq

8
推荐指数
1
解决办法
2686
查看次数

.NET 4 Parallel.ForEach和PLINQ:他们可以压倒线程池并杀死应用程序性能吗?

我在代码中使用Parallel.ForEach和PLINQ越多,我得到的面孔和代码审查就越多.所以我想知道我有什么理由不在每个LINQ语句中使用极端的PLINQ吗?运行时是否能够足够聪明地开始产生如此多的线程(或者从线程池中消耗这么多线程),应用程序性能实际上会降低而不是改进?同样的问题适用于并行库.

我确实理解与线程安全和使用多线程的开销相关的含义.我也意识到并不是一切都有利于并行化.所有我想知道我是否应该停止捍卫我的方法,只是放弃这两个好东西,因为我的同行认为我最好自己做线程控制而不是依靠.NET设施?

更新:请假设硬件足以满足使用多线程的先决条件.

.net c# linq parallel-processing plinq

8
推荐指数
1
解决办法
2051
查看次数

提升数学(ibeta_inv函数)不是线程安全吗?

我已经将boost的一部分 - ibeta_inv函数 - 编译成.Net 64位程序集,并且它工作得很好,直到我开始从多个线程调用它.然后它在某种程度上会返回错误的结果.

我使用此代码(C++/CLI)编写了它:

// Boost.h

#pragma once

#include <boost/math/special_functions/beta.hpp>

using namespace boost::math;

namespace Boost {

    public ref class BoostMath
    {
    public:
        double static InverseIncompleteBeta( double a, double b, double x )
        {
            return ibeta_inv(a,b,x);
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

有人曾尝试过这个吗?

我没有在.Net之外尝试过这个,所以我不知道这是不是原因,但我真的不明白为什么,因为它的单线程很好.

用法(C#):

private void calcBoost(List<Val> vals)
{
    //gives WRONG results (sometimes):
    vals.AsParallel().ForAll(v => v.BoostResult = BoostMath.InverseIncompleteBeta(v.A, v.B, v.X));
    //gives CORRECT results:
    vals.ForEach(v => v.BoostResult = BoostMath.InverseIncompleteBeta(v.A, v.B, v.X));
}
Run Code Online (Sandbox Code Playgroud)

更新:从我在下面的评论中可以看出 - 我完全不确定这是一个Boost问题.也许这是一些奇怪的PLinq到C++/CLI的bug?我被废除了,以后会回来更多的事实.

.net c# boost c++-cli plinq

8
推荐指数
1
解决办法
645
查看次数

LINQ之后的下一件大事是什么?

当它还处于测试版时,我开始使用LINQ(语言集成查询),更具体地说是Microsoft .NET LINQ Preview(2006年5月).差不多4年了,我们在很多项目中使用LINQ来完成最多样化的任务.

我甚至根据LINQ编写了我的最终大学项目.你看我喜欢它.

LINQ以及最近的PLINQ(并行LINQ)在提高编程能力和减少代码行数方面为我们的工作提供了极大的推动力,使我们能够获得更具表现力和可读性的代码.

我一直在想LINQ之后C#的下一个重大语言改进.

我知道有一些突出的语言功能作为代码合同等,但没有任何影响LINQ.

您认为下一件大事是什么?

c# linq language-features plinq

7
推荐指数
1
解决办法
1274
查看次数

如何使用PLINQ在C#中实现MapReduce?

如何使用PLINQ在C#中实现MapReduce?

假设您有7-8个WebServices来收集数据,并且每次接收(异步方式)您必须将这些数据放入数据库的某些表中,在我的例子中它是SQL Server 2008.例如,您从每个数据库获取的数据Web服务是:

 <employees>
  <employee>
   <name>Ramiz</name>
  </employee>
  <employee>
   <name>Aamir</name>
  </employee>
  <employee>
   <name>Zubair</name>
  </employee>
</employees>
Run Code Online (Sandbox Code Playgroud)

并且,在每次收到响应时,这些数据都会进入表名 - 员工:

Employee
===
EmployeeID (PK)
EmployeeName
Run Code Online (Sandbox Code Playgroud)

一旦数据进入表格,它必须返回json到客户端,即ASP.NET(MVC 3)应用程序使用客户端JavaScript(ajax)进行此调用.

假设,WebServiceEmployee1已返回数据,其他6个正在队列中(仍在尝试获取数据).然后,它应该将结果集注册到表而不是等待其他6并将插入的员工的数据返回到json中的客户端.而且,保持联系并做其他人同样的方式.

请参阅我的工具箱,我有ASP.NET MVC 3(Razor),SQL SERVER 2008 R2,jQuery.

谢谢.

c# mapreduce plinq c#-4.0 asp.net-mvc-3

7
推荐指数
1
解决办法
1840
查看次数

PLINQ查询给出溢出异常

我正在运行PLINQ查询,如下所示:

ParallelQuery<string> winningCombos = from n in nextComboMaker.GetNextCombo()
                                              .AsParallel().WithCancellation(_cancelSource.Token)
                                              where ComboWasAWinner(n) 
                                              select n;

ConcurrentBag<string> wins = new ConcurrentBag<string>();

foreach (var winningCombo in winningCombos)
{
        wins.Add(winningCombo);
        if (wins.Count == _maxWinsAllowed)
            break;
}
Run Code Online (Sandbox Code Playgroud)

GetNextCombo方法只返回下一个字母和数字组合,可能达到数十亿/万亿.

现在,当我选择一个大于Int32允许大小的组合范围时抛出异常,它总是在它运行的组合计数器为2147483584时抛出.

我已经确定GetNextCombo中没有任何东西通过创建一个假组合来每次返回(做一个收益率返回"234gf24fa23 ......"等)

LINQ抛出异常:

System.AggregateException was unhandled by user code
  Message=One or more errors occurred.
  Source=System.Core
  StackTrace:
       at System.Linq.Parallel.QueryTaskGroupState.QueryEnd(Boolean userInitiatedDispose)
       at System.Linq.Parallel.MergeExecutor`1.Execute[TKey](PartitionedStream`2 partitions, Boolean ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, Boolean isOrdered, CancellationState cancellationState, Int32 queryId)
       at System.Linq.Parallel.PartitionedStreamMerger`1.Receive[TKey](PartitionedStream`2 partitionedStream)
       at System.Linq.Parallel.ForAllOperator`1.WrapPartitionedStream[TKey](PartitionedStream`2 inputStream, IPartitionedStreamRecipient`1 recipient, Boolean preferStriping, QuerySettings settings)
       at …
Run Code Online (Sandbox Code Playgroud)

c# linq plinq task-parallel-library

7
推荐指数
1
解决办法
1109
查看次数

AsParallel扩展如何实际工作

所以主题是问题.

我得到那个方法AsParallel返回ParallelQuery<TSource>使用相同LINQ关键字的包装器,System.Linq.ParallelEnumerable而不是System.Linq.Enumerable

这很清楚,但是当我查看反编译源时,我不明白它是如何工作的.

让我们从最简单的扩展开始:Sum()方法.码:

[__DynamicallyInvokable]
public static int Sum(this ParallelQuery<int> source)
{
  if (source == null)
    throw new ArgumentNullException("source");
  else
    return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate();
}
Run Code Online (Sandbox Code Playgroud)

很明显,我们去找Aggregate()方法吧.它是InternalAggregate方法的一个包装器,可以捕获一些异常.现在让我们来看看它.

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}
Run Code Online (Sandbox Code Playgroud)

这是一个问题:它是如何工作的?我发现变量没有并发安全性,由许多线程修改,我们只看到迭代器和求和.这是魔术调查员吗?或者它是如何工作的?GetEnumerator()返回QueryOpeningEnumerator<TOutput>,但它的代码太复杂了.

.net c# linq plinq task-parallel-library

7
推荐指数
1
解决办法
773
查看次数

撤消Enumerable.AsParallel(),返回串行模式

假设您有一个LINQ查询

source.AsParallel().Where(expensiveOperation).Select(cheapOperation)
Run Code Online (Sandbox Code Playgroud)

我想在这种情况下Select也可以并行执行模式运行.也许它只是一个便宜的操作i => i*2,所以有没有办法在链接方法查询时停止并行执行?

(也许像.AsParallel().Where(expensiveOp).AsSerial?().Select(cheapOp)?)

c# linq plinq

7
推荐指数
1
解决办法
115
查看次数