标签: parallel-extensions

BlockingCollection(Of T)的目的是什么

我试图在.NET 4上新的Parallel Stacks的上下文中理解BlockingCollection的目的.

MSDN文档说:

BlockingCollection用作IProducerConsumerCollection实例的包装器,允许从集合中删除尝试以阻止数据可用于删除.类似地,可以创建BlockingCollection以强制执行IProducerConsumerCollection中允许的数据元素数量的上限; 然后可以阻止对集合的添加尝试,直到空间可用于存储添加的项目.

但是,当我查看一些IProducerConsumerCollection的实现时,比如ConcurrentQueue,我看到它们提供了一个无锁,线程安全的实现.那么为什么需要BlockingCollection提供的锁机制呢?MSDN中的所有示例都显示通过BlockingCollection包装器使用这些集合,直接使用这些集合有哪些麻烦?使用BlockingCollection会产生什么好处?

.net parallel-extensions

19
推荐指数
2
解决办法
5054
查看次数

Parallel.ForEach不断产生新线程

当我在我的程序中使用Parallel.ForEach时,我发现有些线程似乎永远不会完成.事实上,它一直在反复产生新线程,这种行为是我没想到的,绝对不想要的.

我能够使用以下代码重现此行为,就像我的"真实"程序一样,它们都使用处理器和内存(.NET 4.0代码):

public class Node
{
    public Node Previous { get; private set; }

    public Node(Node previous)
    {
        Previous = previous;
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        DateTime startMoment = DateTime.Now;
        int concurrentThreads = 0;

        var jobs = Enumerable.Range(0, 2000);
        Parallel.ForEach(jobs, delegate(int jobNr)
        {
            Interlocked.Increment(ref concurrentThreads);

            int heavyness = jobNr % 9;

            //Give the processor and the garbage collector something to do...
            List<Node> nodes = new List<Node>();
            Node current = null;
            for (int …
Run Code Online (Sandbox Code Playgroud)

.net c# concurrency parallel-extensions task-parallel-library

14
推荐指数
2
解决办法
8717
查看次数

Parallel.Foreach产生太多线程的方式

问题

虽然我将在这里谈论的代码我在F#中编写,但它基于.NET 4框架,而不是特别依赖于F#的任何特殊性(至少看起来如此!).

我的磁盘上有一些数据,我应该从网络更新,将最新版本保存到磁盘:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with …
Run Code Online (Sandbox Code Playgroud)

.net parallel-processing f# parallel-extensions task-parallel-library

13
推荐指数
3
解决办法
7719
查看次数

如何使用Joins PLINQ现有的LINQ查询?

我正在使用LINQ将两个DataSet相互比较以创建新行并更新现有行.我注意到完整的比较持续约1.5小时,并且两个核心中只有一个忙(任务管理器的CPU使用率为50-52%).我必须承认,我对并行LINQ完全不熟悉,但我认为它可以显着提高性能.

所以我的问题是,我应该如何以及应该并行化什么?

这些是原始查询(简化为要点):

'check for new data
Dim srcUnique = From row In src.Email_Total
                Select Ticket_ID = row.ticket_id, Interaction = row.interaction, ModifiedAt = row.modified_time

Dim destUnique = From row In dest.ContactDetail
                 Where row.ContactRow.fiContactType = emailContactType.idContactType
                 Select row.ContactRow.Ticket_ID, row.Interaction, row.ModifiedAt

'get all emails(contactdetails) that are in source but not in destination
Dim diffRows = srcUnique.Except(destUnique).ToList

'get all new emails(according to ticket_id) for calculating contact columns
Dim newRowsTickets = (From row In src.Email_Total
                     Join d In diffRows
                     On row.ticket_id Equals d.Ticket_ID _ …
Run Code Online (Sandbox Code Playgroud)

.net linq vb.net linq-to-objects parallel-extensions

13
推荐指数
1
解决办法
2485
查看次数

单元测试并发软件 - 你做什么?

随着软件越来越多并发,您如何使用单元测试来处理类型的核心行为(不是并行行为,只是核心行为)?

在过去的好时光中,你有一个类型,你打电话给它,你检查了它返回的内容和/或它所调用的其他内容.

现在,你调用一个方法,实际的工作计划在下一个可用的线程上运行; 你不知道什么时候它会真正启动并调用其他东西 - 更重要的是,其他东西也可能是并发的.

你怎么处理这个?你抽象/注入并发调度程序(例如抽象任务并行库并在单元测试中提供假/模拟)?

您遇到了哪些资源帮助了您?


编辑

我编辑了这个问题,强调测试类型的正常行为(忽略用于利用多核的任何并行机制,例如TPL)


parallel-processing concurrency unit-testing parallel-extensions

12
推荐指数
2
解决办法
1356
查看次数

CorrelationManager.LogicalOperationStack是否与Parallel.For,Tasks,Threads等兼容

有关背景信息,请参阅此问题:

任务并行库中的任务如何影响ActivityID?

该问题询问Tasks如何影响Trace.CorrelationManager.ActivityId.@Greg Samson用测试程序回答了他自己的问题,显示ActivityId在Tasks的上下文中是可靠的.测试程序在Task委托的开头设置一个ActivityId,休眠以模拟工作,然后检查最后的ActivityId以确保它是相同的值(即它没有被另一个线程修改).该程序成功运行.

在研究线程,任务和并行操作的其他"上下文"选项(最终为日志提供更好的上下文)时,我遇到了Trace.CorrelationManager.LogicalOperationStack的一个奇怪问题(无论如何我都很奇怪).我在下面的问题中复制了我的"答案".

我认为它充分描述了我遇到的问题(Trace.CorrelationManager.LogicalOperationStack显然已经损坏 - 或者什么 - 当在Parallel.For的上下文中使用时,但只有当Parallel.For本身包含在逻辑操作中时) .

这是我的问题:

  1. Trace.CorrelationManager.LogicalOperationStack应该可以与Parallel.For一起使用吗?如果是这样,如果一个逻辑操作已经与Parallel.For启动有效,它是否会有所不同?

  2. 是否有一种"正确"的方式使用LogicalOperationStack与Parallel.For?我能不同地对这个示例程序进行编码以使其"有效"吗?通过"工作",我的意思是LogicalOperationStack总是具有预期的条目数,并且条目本身是预期的条目.

我已经使用Threads和ThreadPool线程做了一些额外的测试,但是我必须返回并重试这些测试,看看我是否遇到了类似的问题.

我会说,看起来任务/并行线程和ThreadPool线程确实从父线程"继承"了Trace.CorrelationManager.ActivityId和Trace.CorrelationManager.LogicalOperationStack值.这是预期的,因为CorrelationManager使用CallContext的LogicalSetData方法(而不是SetData)存储这些值.

请再次参考此问题,以获取我在下面发布的"答案"的原始背景:

任务并行库中的任务如何影响ActivityID?

另请参阅Microsoft的Parallel Extensions论坛上的类似问题(目前尚未得到解答):

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[开始粘贴]

请原谅我发布这个作为答案,因为它不是你的问题的真正答案,但是,它与你的问题有关,因为它处理CorrelationManager行为和线程/任务/等.我一直在寻找使用CorrelationManager LogicalOperationStack(和StartLogicalOperation/StopLogicalOperation方法)在多线程场景中提供额外的上下文.

我拿了你的例子并稍微修改它以增加使用Parallel.For并行执行工作的能力.另外,我用StartLogicalOperation/StopLogicalOperation括号(内部)DoLongRunningWork.从概念上讲,DoLongRunningWork每次执行时都会执行以下操作:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

我发现如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(始终是堆栈上预期的操作数,并且堆栈上的操作值始终为预期).

在我自己的一些测试中,我发现并非总是这样.逻辑操作堆栈正在"损坏".我能想到的最好的解释是,当"子"线程退出时,将CallContext信息"合并"回"父"线程上下文导致"旧"子线程上下文信息(逻辑操作)为"继承"由另一个兄弟姐妹线程.

问题也可能与Parallel.For显然使用主线程(至少在示例代码中,如编写)作为"工作线程"之一(或者在并行域中应该调用它们)之间的事实有关.每当执行DoLongRunningWork时,就会启动一个新的逻辑操作(在开始时)并停止(在结束时)(也就是说,将其推送到LogicalOperationStack并从中弹出).如果主线程已经有效的逻辑操作,并且DoLongRunningWork在主线程上执行,则启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作.DoLongRunningWork的任何后续执行(只要DoLongRunningWork的这个"迭代"在主线程上执行)将(显然)继承主线程的LogicalOperationStack(现在它有两个操作,而不仅仅是一个预期的操作).

我花了很长时间才弄清楚为什么LogicalOperationStack的行为在我的示例中与我的示例的修改版本不同.最后我看到在我的代码中我将整个程序放在逻辑操作中,而在我的测试程序的修改版本中,我没有.这意味着在我的测试程序中,每次执行"工作"(类似于DoLongRunningWork)时,已经存在逻辑操作.在我的测试程序的修改版本中,我没有在逻辑操作中将整个程序括起来.

所以,当我修改你的测试程序以在逻辑操作中包含整个程序时如果我使用Parallel.For,我遇到了完全相同的问题.

使用上面的概念模型,这将成功运行:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

虽然这最终会因为LogicalOperationStack显然不同步而断言:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

这是我的示例程序.它类似于你的,因为它有一个DoLongRunningWork方法来操作ActivityId以及LogicalOperationStack.我也有两种踢DoLongRunningWork的方式.一种风味使用任务一使用Parallel.For.还可以执行每种风格,使得整个并行操作被包含在逻辑操作中或不包含在逻辑操作中.因此,总共有4种方法来执行并行操作.要尝试每个,只需取消注释所需的"使用..."方法,重新编译并运行. UseTasks,UseTasks(true)并且UseParallelFor应该全部运行完成. UseParallelFor(true)因为LogicalOperationStack没有预期的条目数,所以会在某些时候断言.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text; …
Run Code Online (Sandbox Code Playgroud)

.net c# system.diagnostics parallel-extensions task-parallel-library

10
推荐指数
1
解决办法
4110
查看次数

在NUnit测试PLINQ代码后,如何防止AppDomainUnloadedException?

我该如何诊断,减少或预防AppDomainUnloadedException

AppDomainUnloadedException经过涉及PLINQ的长期(> 10s)测试后,NUnit 2.5.2一直在抛出.

回到2008年7月,Stephen Toub说:

是的,CTP中的调度程序不能很好地处理线程中止,这会导致进程在关闭的域中存在实时调度程序时崩溃(因为AppDomain关闭会导致该域中具有堆栈帧的所有线程上的线程中止).我们正努力为将来的版本强化这一点.

我尝试了很多解决方法,包括:

  • 在单独的方法中运行测试以消除杂散引用
  • 指定/domain:None为NUNit参数
  • legacyUnhandledAppDomainPolicy从中删除元素nunit-console.exe.config

我无法降级NUnit以降低竞争条件的可能性,因为我需要PLINQ来使我的参数测试更快.没有问题的NUnit版本不支持参数测试.

.net nunit appdomain parallel-extensions plinq

9
推荐指数
1
解决办法
1725
查看次数

为什么TaskFactory.StartNew方法不通用?

使用.NET 4.0中的TPL启动新的仅限副作用的任务(即:不返回结果的任务)的自动方式是使用以下API:

Task Task.Factory.StartNew(Action<object>, object)   
Run Code Online (Sandbox Code Playgroud)

但是为什么这个API的签名看起来不像这样

Task Task.Factory.StartNew<T>(Action<T>, T) 
Run Code Online (Sandbox Code Playgroud)

或者像这样

Task Task.Factory.StartNew<T>(T, Action<T>) 
Run Code Online (Sandbox Code Playgroud)

技术原因还是其他原因?

c# multithreading parallel-extensions

9
推荐指数
1
解决办法
2057
查看次数

使用Parallel Linq Extensions结合两个序列,如何才能首先产生最快的结果?

假设我有两个序列返回整数1到5.

第一个返回1,2和3非常快,但4和5每个需要200ms.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}
Run Code Online (Sandbox Code Playgroud)

第二个返回1,2和3,延迟时间为200ms,但快速返回4和5.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}
Run Code Online (Sandbox Code Playgroud)

联合这两个序列只给出数字1到5.

FastFirst().Union(SlowFirst());
Run Code Online (Sandbox Code Playgroud)

我不能保证两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证为我提供解决方案.因此,我想将联盟并行化,以便最小化我的例子中的(人为的)延迟.

一个真实场景:我有一个返回一些实体的缓存,以及一个返回所有实体的数据源.我希望能够从一个方法返回一个迭代器,该方法将请求内部并行化到缓存和数据源,以便缓存的结果尽可能快地生成.

注1:我意识到这仍然在浪费CPU周期; 我不是问我怎么能阻止序列迭代它们的慢元素,我怎么能尽可能快地结合它们.

更新1:我已经定制了achitaka-san对接受多个生成器的响应,并使用ContinueWhenAll将BlockingCollection的CompleteAdding设置为一次.我只是把它放在这里,因为它会因缺少注释格式而丢失.任何进一步的反馈都会很棒!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList …
Run Code Online (Sandbox Code Playgroud)

.net c# parallel-processing parallel-extensions plinq

9
推荐指数
1
解决办法
1012
查看次数

实现并行无限循环的最佳方法是什么?

我已经习惯在.Net的并行扩展中使用Parallel.For(),因为它是一种简单的并行化代码的方法,而无需手动启动和维护线程(这可能是繁琐的).我现在正在看一个无限循环(做一些事情,直到我发出信号停止),我希望并行化,没有一个参数可以自由Parallel.For()重载这样做,所以想知道这里最好的方法是什么是.原则上我可以这样做:

Parallel.For(0, int.Max)
Run Code Online (Sandbox Code Playgroud)

但我怀疑这可能不是工作分区逻辑处理的预期/有效模式(?)

另一种选择是:

for(;;)
{
    Parallel.For(0, 128, delegate()
    {
       // Do stuff.
    }
}
Run Code Online (Sandbox Code Playgroud)

但这似乎不够优雅,也可能导致低效的工作分区.

现在我的直觉是通过创建和维护我自己的线程来手动执行此操作,但我有兴趣获得一些反馈/意见.谢谢.

===更新===

我在接受的答案中使用了文章中的代码的简化版本(我删除了ParallelOptions参数).这是代码......

public class ParallelUtils
{
    public static void While(Func<bool> condition, Action body) 
    { 
        Parallel.ForEach(IterateUntilFalse(condition), ignored => body()); 
    }

    private static IEnumerable<bool> IterateUntilFalse(Func<bool> condition) 
    { 
        while (condition()) yield return true; 
    }
}
Run Code Online (Sandbox Code Playgroud)

一个示例用法是:

Func<bool> whileCondFn = () => !_requestStopFlag;
ParallelUtils.While(whileCondFn, delegate()
{
    // Do stuff.


});
Run Code Online (Sandbox Code Playgroud)

.net c# parallel-processing parallel-extensions parallel-for

9
推荐指数
2
解决办法
6168
查看次数