我有一块C#5.0代码,可以生成大量的网络和磁盘I/O. 我需要并行运行此代码的多个副本.以下哪种技术可能会给我带来最佳性能:
等待的异步方法
直接使用TPL中的Task
TPL数据流nuget
反应性扩展
我不太擅长这种平行的东西,但如果使用较低的杠杆,比如说Thread,可以给我更好的性能,我也会考虑.
conceptual task-parallel-library system.reactive async-await tpl-dataflow
我知道......我并没有真正使用TplDataflow来发挥它的最大潜力.ATM我只是BufferBlock用作消息传递的安全队列,其中生产者和消费者以不同的速率运行.我看到一些奇怪的行为,让我难以理解如何继续.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码中(它是2000行分布式解决方案的一部分),Send每100ms左右定期调用一次.这意味着一个项目被Post编到messageQueue在约10次.这已经过验证.但是,偶尔看起来ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且TimeoutException在30秒后被提升.在这一点上,messageQueue.Count是数百.这是出乎意料的.这个问题已经在较慢的发布率(1个帖子/秒)中观察到,并且通常在1000个项目通过之前发生BufferBlock.
因此,要解决此问题,我使用以下代码,它可以工作,但偶尔会在接收时导致1秒延迟(由于上面发生的错误)
public async …Run Code Online (Sandbox Code Playgroud) TPL Dataflow提供TransformBlock转换输入,例如:
var tb = new TransformBlock<int, int>(i => i * 2);
Run Code Online (Sandbox Code Playgroud)
是否有可能不输出一些输入,例如,如果输入未通过某些验证测试?
var tb = new TransformBlock<InputType, OutputType>(i =>
{
if (!ValidateInput(i))
{
// Do something to not output anything for this input
}
// Normal output
}
Run Code Online (Sandbox Code Playgroud)
如果那是不可能的,那么实现这一目标的最佳模式是什么?
像下面这样的东西?
BufferBlock<OutputType> output = new BufferBlock<OutputType>();
var ab = new ActionBlock<InputType>(i =>
{
if (ValidateInput(i))
{
output.Post(MyTransform(i));
}
}
Run Code Online (Sandbox Code Playgroud) 我刚刚将Visual Studio 11 Beta升级到新的Visual Studio 2012 RC,并且在引用TPL Dataflow时遇到了问题.
首先,我尝试通过添加框架中的引用,像之前一样引用Dataflow.但是当我尝试这样做时,我得到一个错误框:
无法添加对"System.Threading.Tasks.Dataflow"的引用.
然后整个Visual Studio冻结.
在阅读了用于.NET Framework 4.5 RC的MEF和TPL Dataflow NuGet包之后,我假设在引用列表中显示的Dataflow版本是先前安装的某种工件.所以,我尝试使用NuGet的Dataflow,这似乎有效,直到我真的尝试编译我的代码,因为我收到了一个错误:
类型'System.Threading.Tasks.Task'在未引用的程序集中定义.您必须添加对程序集'System.Threading.Tasks,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b03f5f7f11d50a3a'的引用.
这很令人困惑,因为Task在mscorlib中,不需要其他引用.但是System.Threading.Tasks在引用列表中有一个引用程序集,所以我尝试添加它.不幸的是,一个熟悉的错误表明
无法添加对"System.Threading.Tasks"的引用.
然后Visual Studio再次冻结.
难道我做错了什么?如何在VS 2012 RC中使用TPL Dataflow?
.net task-parallel-library nuget tpl-dataflow visual-studio-2012
我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制).
背景
我有一个应用程序接收定期数据转储(XML文件)并使用Entity Framework 5(代码优先)将它们导入现有数据库.导入通过EF5而不是BULK INSERT或BCP进行,因为必须应用已存在于实体中的业务规则.
处理似乎是应用程序本身的CPU绑定(极快,启用了写缓存的磁盘IO子系统在整个过程中显示几乎为零的磁盘等待时间,而SQL Server显示的CPU时间不超过8%-10%).
为了提高效率,我使用TPL Dataflow构建了一个管道,其组件包括:
Read & Parse XML file
|
V
Create entities from XML Node
|
V
Batch entities (BatchBlock, currently n=200)
|
V
Create new DbContext / insert batched entities / ctx.SaveChanges()
Run Code Online (Sandbox Code Playgroud)
通过这样做,我看到性能大幅提升,但不能使CPU高于60%.
分析
怀疑某种资源争用,我使用VS2012 Profiler的资源争用数据(并发)模式运行该过程.
分析器显示52%的争用标记为句柄2的资源.钻进,我看到创建Handle 2最多争用的方法是
System.Data.Entity.Internal.InternalContext.SaveChanges()
Run Code Online (Sandbox Code Playgroud)
第二名,与SaveChanges()的争论大约是40%
System.Data.Entity.DbSet`1.Add(!0)
Run Code Online (Sandbox Code Playgroud)
问题
UPDATE
对于有问题的运行,调用SaveChanges的任务的最大并行度设置为12(我尝试了各种值,包括先前运行中的Unbounded).
更新2
微软的EF团队提供了反馈意见.请参阅我的答案以获取摘要.
parallel-processing profiling entity-framework task-parallel-library tpl-dataflow
我特别想要在一个或另一个中编写一些信号处理算法,或者两者兼而有之.
表现并不是一个大问题,表达意图的清晰度更为重要.
我想要实现以下'Blocks'并组成它们:
我认为Rx可以被认为是'Linq-to-streams',而TPL是对并发的抽象.我还得到Rx在内部使用TPL来管理其异步位,并且TPL数据流为TPL增加了可组合性.
所以两者都是异步的,都是可组合的,都是相当高的水平(Rx更多).通常和上面的信号处理项目应该在哪里使用?
signal-processing dataflow task-parallel-library system.reactive tpl-dataflow
我期望以下两个发布者产生输出,但它只产生第一个的输出:
var broadcastBlock = new BroadcastBlock<int>(null);
var transformBlock = new TransformBlock<int, int>(i => i*10);
var publish1 = new ActionBlock<int>(i => Console.WriteLine("Publisher 1:" + i));
var publish2 = new ActionBlock<int>(i => Console.WriteLine("Publisher 2:" + i));
broadcastBlock.LinkTo(transformBlock, new DataflowLinkOptions() { PropagateCompletion = true });
transformBlock.LinkTo(publish1, new DataflowLinkOptions() { PropagateCompletion = true });
transformBlock.LinkTo(publish2, new DataflowLinkOptions() { PropagateCompletion = true });
foreach (var i in Enumerable.Range(0, 5))
{
broadcastBlock.Post(i);
}
broadcastBlock.Complete();
Task.WhenAll(publish1.Completion, publish2.Completion).Wait();
Run Code Online (Sandbox Code Playgroud)
我显然在这里缺少一些基本的东西,任何想法?
我只是想学习它们以及如何一起使用它们.我知道他们可以相互补充我只是找不到某人实际做的例子.
.net task-parallel-library system.reactive c#-5.0 tpl-dataflow
我创建了这个来测试一个并行提取:
public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
{
ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
using (var archive = ZipFile.OpenRead(file.FullName))
{
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
block.Post(entry);
}
block.Complete();
await block.Completion;
}
}
Run Code Online (Sandbox Code Playgroud)
以及用于测试的以下单元测试:
[TestMethod]
public async Task ExtractTestAsync()
{
if (Resources.LocalExtractFolder.Exists)
Resources.LocalExtractFolder.Delete(true);
// Resources.LocalExtractFolder.Create();
await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
}
Run Code Online (Sandbox Code Playgroud)
使用MaxDegreeOfParallelism = 1,一切正常,但有两个则没有.
Test Name: ExtractTestAsync
Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync …Run Code Online (Sandbox Code Playgroud) tpl-dataflow ×10
.net ×6
c# ×5
async-await ×2
dataflow ×2
asynchronous ×1
c#-5.0 ×1
conceptual ×1
nuget ×1
profiling ×1