标签: tpl-dataflow

I/O性能 - 异步与TPL对比数据流与RX

我有一块C#5.0代码,可以生成大量的网络和磁盘I/O. 我需要并行运行此代码的多个副本.以下哪种技术可能会给我带来最佳性能:

  • 等待的异步方法

  • 直接使用TPL中的Task

  • TPL数据流nuget

  • 反应性扩展

我不太擅长这种平行的东西,但如果使用较低的杠杆,比如说Thread,可以给我更好的性能,我也会考虑.

conceptual task-parallel-library system.reactive async-await tpl-dataflow

18
推荐指数
3
解决办法
7426
查看次数

表观BufferBlock.Post/Receive/ReceiveAsync种族/错误

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道......我并没有真正使用TplDataflow来发挥它的最大潜力.ATM我只是BufferBlock用作消息传递的安全队列,其中生产者和消费者以不同的速率运行.我看到一些奇怪的行为,让我难以理解如何继续.

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码中(它是2000行分布式解决方案的一部分),Send每100ms左右定期调用一次.这意味着一个项目被Post编到messageQueue在约10次.这已经过验证.但是,偶尔看起来ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且TimeoutException在30秒后被提升.在这一点上,messageQueue.Count是数百.这是出乎意料的.这个问题已经在较慢的发布率(1个帖子/秒)中观察到,并且通常在1000个项目通过之前发生BufferBlock.

因此,要解决此问题,我使用以下代码,它可以工作,但偶尔会在接收时导致1秒延迟(由于上面发生的错误)

    public async …
Run Code Online (Sandbox Code Playgroud)

c# dataflow task-parallel-library async-await tpl-dataflow

17
推荐指数
1
解决办法
1727
查看次数

跳过Dataflow TransformBlock中的项目

TPL Dataflow提供TransformBlock转换输入,例如:

var tb = new TransformBlock<int, int>(i => i * 2);
Run Code Online (Sandbox Code Playgroud)

是否有可能不输出一些输入,例如,如果输入未通过某些验证测试?

var tb = new TransformBlock<InputType, OutputType>(i =>
{
    if (!ValidateInput(i))
    {
        // Do something to not output anything for this input
    }
    // Normal output
}
Run Code Online (Sandbox Code Playgroud)

如果那是不可能的,那么实现这一目标的最佳模式是什么?
像下面这样的东西?

BufferBlock<OutputType> output = new BufferBlock<OutputType>();

var ab = new ActionBlock<InputType>(i =>
{
    if (ValidateInput(i)) 
    {
        output.Post(MyTransform(i));
    }
}
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

17
推荐指数
3
解决办法
4353
查看次数

在VS 2012 RC中引用TPL数据流和TPL的问题

我刚刚将Visual Studio 11 Beta升级到新的Visual Studio 2012 RC,并且在引用TPL Dataflow时遇到了问题.

首先,我尝试通过添加框架中的引用,像之前一样引用Dataflow.但是当我尝试这样做时,我得到一个错误框:

无法添加对"System.Threading.Tasks.Dataflow"的引用.

然后整个Visual Studio冻结.

在阅读了用于.NET Framework 4.5 RC的MEF和TPL Dataflow NuGet包之后,我假设在引用列表中显示的Dataflow版本是先前安装的某种工件.所以,我尝试使用NuGet的Dataflow,这似乎有效,直到我真的尝试编译我的代码,因为我收到了一个错误:

类型'System.Threading.Tasks.Task'在未引用的程序集中定义.您必须添加对程序集'System.Threading.Tasks,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b03f5f7f11d50a3a'的引用.

这很令人困惑,因为Task在mscorlib中,不需要其他引用.但是System.Threading.Tasks在引用列表中有一个引用程序集,所以我尝试添加它.不幸的是,一个熟悉的错误表明

无法添加对"System.Threading.Tasks"的引用.

然后Visual Studio再次冻结.

难道我做错了什么?如何在VS 2012 RC中使用TPL Dataflow?

.net task-parallel-library nuget tpl-dataflow visual-studio-2012

15
推荐指数
1
解决办法
1万
查看次数

在数据流网络中使用BufferBlock <T>的好处

我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制).

.net c# task-parallel-library tpl-dataflow

14
推荐指数
3
解决办法
9905
查看次数

实体框架与并行

背景

我有一个应用程序接收定期数据转储(XML文件)并使用Entity Framework 5(代码优先)将它们导入现有数据库.导入通过EF5而不是BULK INSERT或BCP进行,因为必须应用已存在于实体中的业务规则.

处理似乎是应用程序本身的CPU绑定(极快,启用了写缓存的磁盘IO子系统在整个过程中显示几乎为零的磁盘等待时间,而SQL Server显示的CPU时间不超过8%-10%).

为了提高效率,我使用TPL Dataflow构建了一个管道,其组件包括:

Read & Parse XML file
        |
        V
Create entities from XML Node
        |
        V
Batch entities (BatchBlock, currently n=200)
        |
        V
Create new DbContext / insert batched entities / ctx.SaveChanges()
Run Code Online (Sandbox Code Playgroud)

通过这样做,我看到性能大幅提升,但不能使CPU高于60%.

分析

怀疑某种资源争用,我使用VS2012 Profiler的资源争用数据(并发)模式运行该过程.

分析器显示52%的争用标记为句柄2的资源.钻进,我看到创建Handle 2最多争用的方法是

System.Data.Entity.Internal.InternalContext.SaveChanges()
Run Code Online (Sandbox Code Playgroud)

第二名,与SaveChanges()的争论大约是40%

System.Data.Entity.DbSet`1.Add(!0)
Run Code Online (Sandbox Code Playgroud)

问题

  • 我怎样才能弄清楚Handle 2究竟是什么(例如TPL的一部分,EF的一部分)?
  • EF限制调用将DbContext实例与单独的线程分开吗?似乎他们正在争夺一个共享资源.
  • 在这种情况下,我有什么办法可以改善并行性吗?

UPDATE

对于有问题的运行,调用SaveChanges的任务的最大并行度设置为12(我尝试了各种值,包括先前运行中的Unbounded).

更新2

微软的EF团队提供了反馈意见.请参阅我的答案以获取摘要.

parallel-processing profiling entity-framework task-parallel-library tpl-dataflow

14
推荐指数
1
解决办法
3809
查看次数

TPL数据流优于反应性扩展(Rx)的用例有哪些

我特别想要在一个或另一个中编写一些信号处理算法,或者两者兼而有之.

表现并不是一个大问题,表达意图的清晰度更为重要.

我想要实现以下'Blocks'并组成它们:

  • 滤波器(FIR和IIR)
  • 相位检测器
  • 集成
  • 搅拌机
  • 函数发生器
  • PLL(使用上面的构建块)

我认为Rx可以被认为是'Linq-to-streams',而TPL是对并发的抽象.我还得到Rx在内部使用TPL来管理其异步位,并且TPL数据流为TPL增加了可组合性.

所以两者都是异步的,都是可组合的,都是相当高的水平(Rx更多).通常和上面的信号处理项目应该在哪里使用?

signal-processing dataflow task-parallel-library system.reactive tpl-dataflow

13
推荐指数
1
解决办法
2472
查看次数

如何在TPL数据流中将多个目标块与源块链接?

我期望以下两个发布者产生输出,但它只产生第一个的输出:

var broadcastBlock = new BroadcastBlock<int>(null);
var transformBlock = new TransformBlock<int, int>(i => i*10);
var publish1 = new ActionBlock<int>(i => Console.WriteLine("Publisher 1:" + i));
var publish2 = new ActionBlock<int>(i => Console.WriteLine("Publisher 2:" + i));

broadcastBlock.LinkTo(transformBlock, new DataflowLinkOptions() { PropagateCompletion = true });
transformBlock.LinkTo(publish1, new DataflowLinkOptions() { PropagateCompletion = true });
transformBlock.LinkTo(publish2, new DataflowLinkOptions() { PropagateCompletion = true });

foreach (var i in Enumerable.Range(0, 5))
{
    broadcastBlock.Post(i);
}
broadcastBlock.Complete();
Task.WhenAll(publish1.Completion, publish2.Completion).Wait();
Run Code Online (Sandbox Code Playgroud)

我显然在这里缺少一些基本的东西,任何想法?

.net c# task-parallel-library tpl-dataflow

13
推荐指数
1
解决办法
3221
查看次数

TPL Dataflow和Rx组合示例

我只是想学习它们以及如何一起使用它们.我知道他们可以相互补充我只是找不到某人实际做的例子.

.net task-parallel-library system.reactive c#-5.0 tpl-dataflow

12
推荐指数
1
解决办法
2916
查看次数

我做错了什么或者不能并行提取zip文件?

我创建了这个来测试一个并行提取:

    public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
    {

        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
        {
            var path = Path.Combine(folder.FullName, entry.FullName);

            Directory.CreateDirectory(Path.GetDirectoryName(path));
            entry.ExtractToFile(path);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

        using (var archive = ZipFile.OpenRead(file.FullName))
        {
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            {
                block.Post(entry);
            }
            block.Complete();
            await block.Completion;
        }

    }
Run Code Online (Sandbox Code Playgroud)

以及用于测试的以下单元测试:

    [TestMethod]
    public async Task ExtractTestAsync()
    {
        if (Resources.LocalExtractFolder.Exists)
            Resources.LocalExtractFolder.Delete(true);
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    }
Run Code Online (Sandbox Code Playgroud)

使用MaxDegreeOfParallelism = 1,一切正常,但有两个则没有.

Test Name:  ExtractTestAsync
Test FullName:  Composite.Azure.Tests.ZipFileTests.ExtractTestAsync …
Run Code Online (Sandbox Code Playgroud)

.net c# asynchronous tpl-dataflow

12
推荐指数
1
解决办法
9138
查看次数