以下测试程序似乎没有下蹲.这是因为我正在测试一个小清单吗?
static void Main(string[] args)
{
List<int> list = 0.UpTo(4);
Test(list.AsParallel());
Test(list);
}
private static void Test(IEnumerable<int> input)
{
var timer = new Stopwatch();
timer.Start();
var size = input.Count();
if (input.Where(IsOdd).Count() != size / 2)
throw new Exception("Failed to count the odds");
timer.Stop();
Console.WriteLine("Tested " + size + " numbers in " + timer.Elapsed.TotalSeconds + " seconds");
}
private static bool IsOdd(int n)
{
Thread.Sleep(1000);
return n%2 == 1;
}
Run Code Online (Sandbox Code Playgroud)
两个版本都需要4秒才能运行.
假设我有一个IO绑定任务.我使用WithDegreeOfParallelism = 10和WithExecution = ForceParallelism模式,但查询仍然只使用两个线程.为什么?
我知道PLINQ通常会选择一个与我的核心数相等的并行度,但为什么它忽略了我对更高并行性的特定要求呢?
static void Main(string[] args)
{
TestParallel(0.UpTo(8));
}
private static void TestParallel(IEnumerable<int> input)
{
var timer = new Stopwatch();
timer.Start();
var size = input.Count();
if (input.AsParallel().
WithDegreeOfParallelism(10).
WithExecutionMode(ParallelExecutionMode.ForceParallelism).
Where(IsOdd).Count() != size / 2)
throw new Exception("Failed to count the odds");
timer.Stop();
Console.WriteLine("Tested " + size + " numbers in " + timer.Elapsed.TotalSeconds + " seconds");
}
private static bool IsOdd(int n)
{
Thread.Sleep(1000);
return n%2 == 1;
}
Run Code Online (Sandbox Code Playgroud) 我有一个IEnumerable的扩展方法,然后迭代集合,做它的业务,然后返回一个新的IEnumerable.
我试图使用.AsParallel().ForAll()来显着加速迭代(当然它应该这样做),但是当返回集合时,该集合中通常有一些对象是null.
我假设这可能是因为它在所有'业务'有机会完成之前返回集合?如果我调试并放入断点,则没有空值.
我应该使用某种"等待这个操作完成"的方法吗?
编辑:要更清楚一点,在forall中有业务逻辑,修改属性等.有必要循环一个动作,而不是简单地选择一些东西.
简介:我从System.Threading.Tasks.Parallel.ForEach和Concurrent Data结构更改为简单的plinq(Parallel Linq)查询.加速是惊人的.
那么plinq固有地比Parallel.ForEach快吗?或者它是否特定于任务.
// Original Code
// concurrent dictionary to store results
var resultDict = new ConcurrentDictionary<string, MyResultType>();
Parallel.ForEach(items, item =>
{
resultDict.TryAdd(item.Name, PerformWork(source));
});
// new code
var results =
items
.AsParallel()
.Select(item => new { item.Name, queryResult = PerformWork(item) })
.ToDictionary(kv => kv.SourceName, kv => kv.queryResult);
Run Code Online (Sandbox Code Playgroud)
注意:每个任务(PerformWork)现在运行0到200毫秒.在我优化它之前,它需要更长的时间.这就是我在第一时间使用Tasks.Parallel库的原因.所以我从总时间的2秒到大约100-200毫秒的总时间,执行大致相同的工作,只是使用不同的方法.(哇linq和plinq太棒了!)
问题:
PLINQ主要基于功能风格的编程而没有副作用,而副作用正是TPL的用途.如果你想实际并行工作而不是仅仅并行搜索/选择事物,你可以使用TPL.
我可以假设,因为我的模式基本上是功能性的(给输入产生没有突变的新输出),plinq是正确使用的技术吗?
我正在寻找验证我的假设是正确的,或者表明我错过了什么.
许多自定义Enumerable扩展可以根据其他内置操作实现 - 例如这个简单的方便方法:
public static bool AnyOf<TElement>(this TElement item, IEnumerable<TElement> items)
{
return items.Any(a => EqualityComparer<TElement>.Default.Equals(a, item));
}
Run Code Online (Sandbox Code Playgroud)
现在这将强制任何PLINQ查询回到顺序操作,即使PLINQ也有一个Any - 并且只相当于一个签名更改:
public static bool AnyOf<T>(this T item, ParallelQuery<T> items)
{
return items.Any(a => EqualityComparer<T>.Default.Equals(a, item));
}
Run Code Online (Sandbox Code Playgroud)
但是像这样复制它对我来说似乎很麻烦.
起初我认为类似下面的东西可能会起作用,但当然它不会^因为扩展方法是静态方法,因此决定调用Enumerable.Any而不是ParallelQuery.Any基于签名在编译时进行.
public static bool AnyOf<TElement, TEnumerable>(this TElement item, TEnumerable items)
where TEnumerable : class, IEnumerable<TElement>
{
return items.Any(a => EqualityComparer<TElement>.Default.Equals(a, item));
}
Run Code Online (Sandbox Code Playgroud)
我得出的结论是,如果没有使用不同的签名创建每个方法的副本,那是不可能的,但也许我错过了一些东西.(啧啧总是带着不可能的问题!)
或许可以从并行化(显然可以链接等)中受益的帮助器的更好示例是这样的.
public static IEnumerable<string> ToStrings(this IEnumerable<object> ienum)
{
return ienum.Select(a=> a.ToString());
}
Run Code Online (Sandbox Code Playgroud)
^编译器错误:
The type 'ParallelQuery<TElement>' …Run Code Online (Sandbox Code Playgroud) 我已经看过这个使用AsParallel()和检查条件的代码Any() :
bool IsAnyDeviceConnected()
{
return m_devices.Any(d => d.IsConnected);
}
Run Code Online (Sandbox Code Playgroud)
并使其更快:
bool IsAnyDeviceConnected()
{
return m_devices.AsParallel().Any(d => d.IsConnected);
}
Run Code Online (Sandbox Code Playgroud)
但看看Any():
internal static bool Any<T>(this IEnumerable<T> source, Func<T, bool> predicate) {
foreach (T element in source) {
if (predicate(element)) {
return true;
}
}
return false;
}
Run Code Online (Sandbox Code Playgroud)
我没有看到(显然) - 它确实关心取消其他工人 - 一旦找到.
但是 - 这个(其他)代码确实"尽快完成"+取消其他未来的工作:
bool IsAnyDeviceConnected()
{
var res = Parallel.ForEach(m_devices,
(d,loopState) => {
if (d.IsConnected)
loopState.Stop();
});
return …Run Code Online (Sandbox Code Playgroud) 我有大约250,000条记录标记为Boss,每个Boss有2到10名职员.我每天都需要了解员工的详细信息.大约有1,000,000名员工.我正在使用Linq获取每日工作人员的唯一列表.请考虑以下C#LINQ和模型
void Main()
{
List<Boss> BossList = new List<Boss>()
{
new Boss()
{
EmpID = 101,
Name = "Harry",
Department = "Development",
Gender = "Male",
Employees = new List<Person>()
{
new Person() {EmpID = 102, Name = "Peter", Department = "Development",Gender = "Male"},
new Person() {EmpID = 103, Name = "Emma Watson", Department = "Development",Gender = "Female"},
}
},
new Boss()
{
EmpID = 104,
Name = "Raj",
Department = "Development",
Gender = "Male",
Employees = new List<Person>()
{
new …Run Code Online (Sandbox Code Playgroud) 谁能告诉我Plinq的正确代码是什么?我将双数组中每个元素的正弦的绝对值的平方根加起来,但是Plinq给了我错误的结果.
该计划的输出是:
Linq聚合= 75.8310477905274(正确)Plinq聚合= 38.0263653589291(大约应该是它的一半)
我一定是做错了什么,但我找不到什么......
(我在Core 2 Duo Windows 7 x64 PC上运行Visual Studio 2008.)
这是代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
double[] array = new double[100];
for (int i = 0; i < array.Length; ++i)
{
array[i] = i;
}
double sum1 = array.Aggregate((total, current) => total + Math.Sqrt(Math.Abs(Math.Sin(current))));
Console.WriteLine("Linq aggregate = " + sum1);
IParallelEnumerable<double> parray = array.AsParallel<double>();
double sum2 = parray.Aggregate((total, current) => …Run Code Online (Sandbox Code Playgroud) 我想在Windows Azure上的Worker进程中使用TPL.我想在队列中添加一个IJob,这有一个Run方法,因此worker将包含:
loop get item off queue使用TPL调用IJob.Run,这是一个异步调用
但我有点担心我可以添加到TPL的最大项目?如果需要,我很乐意构建我自己的某种TPL池,只需检查它的功能.
干杯,灰.
multithreading worker-process azure plinq task-parallel-library
我有一些无限的生成器方法,包括一些长时间运行和无限长时间运行的生成器.
IEnumerable<T> ExampleOne() {
while(true) // this one blocks for a few seconds at a time
yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() {
while(true) //this one blocks for a really long time
yield return OtherLongRunningFunction();
}
Run Code Online (Sandbox Code Playgroud)
我的目标是拥有一个无限序列,它结合了两个例子中的项目.这是我尝试过的,使用PLINQ:
IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() }
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.SelectMany(source => source.GetRequests());
Run Code Online (Sandbox Code Playgroud)
这似乎恰当地将两个IEnumerables组合成一个新的,IEnumerable只要#1和#2中的项目出现在两个源中的任何一个中,它们就可用IEnumerables:
//assuming ExampleTwo yields TWO but happens roughly 5 times
//less often then ExampleOne
Example output: one one one one one TWO one one one …Run Code Online (Sandbox Code Playgroud)