标签: tpl-dataflow

在使用 TPL 数据流的预定义块之上创建可重用的处理逻辑?

我喜欢 TPL 数据流。

嗯,一个有趣的设计选择是,大多数预定义块使用委托来允许我们实现处理逻辑。这在简单的场景中看起来不错。但是让我们考虑一下现实世界的大型应用程序,它需要模块化和封装性。我发现使用 DELEGATE 方法编写结构良好的应用程序既困难又不自然。

例如,如果我想要的只是 aMultiplyIntByTwoTransformBlock和 aNoOpActionBlock作为可重用的类 TYPE (而不是实例)。我如何实现它?我希望我可以从TransformBlock/继承ActionBlock并说,覆盖一些Process()方法来实现这一点。但是预定义的块是密封的。他们只接受代表。

我知道我可以从头开始创建一个自定义块,但显然这对我来说太复杂了,因为我需要的只是在预定义块之上进行一些自定义。

那么,我如何实现我的目标?

更新:我并不是说有些事情代表不能做。我是说在模板方法模式中公开抽象块在许多情况下更好。比如说,我希望我可以编写一个 AbstractMultiplyBlock 和 MultiplyByTwoBlock 和 MultiplyByThreeBlock,利用多态性。遗憾的是,代表们没有提供这种数据和逻辑可重用性。

c# task-parallel-library .net-4.5 tpl-dataflow

3
推荐指数
1
解决办法
1976
查看次数

进行<T>缓冲WPF更新

我有一个方法,它Progress<T>作为参数,并在内部报告进展.

因为进度可能在工作线程上快速发生(例如每秒数百次),我需要缓冲它,然后用接收的记录更新视图模型,例如每半秒一次.

在过去我使用过,Observable.FromEventPattern().Buffer(TimeSpan)所以我看到如果我将Progress<T>报告包装到一个事件中,就可以使用相同的Rx机制.然而,这似乎是一种矫枉过正.有更优雅的解决方案吗?

我也看过TPL Dataflow,BufferBlock但我不确定它是否支持基于时间的缓冲,例如每半秒一次.

如果有人有例子,请发布.非常感谢.

c# wpf buffering system.reactive tpl-dataflow

3
推荐指数
1
解决办法
281
查看次数

C# TPL 数据流 - 完成不起作用

此代码永远不会到达最后一行,因为完成不会从 saveBlock 传播到 sendBlock。我究竟做错了什么?

var readGenerateBlock = new TransformBlock<int, int>(n =>
    {
        Console.WriteLine("Read " + n);
        Thread.Sleep(15);
        return n;
    }); 
var groupingBlock = new BatchBlock<int>(10);
var saveBlock = new TransformManyBlock<int[], int>(n =>
    {
        Console.WriteLine("Saving {0} items [{1}; {2}]", n.Count(), n.First(), n.Last());
        Thread.Sleep(150);
        return n;
    }); 
var sendBlock = new TransformBlock<int, int>(n =>
    {
        Console.WriteLine("Sending {0}", n);
        Thread.Sleep(25);
        return n;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); 

readGenerateBlock.LinkTo(groupingBlock, new DataflowLinkOptions { PropagateCompletion = true });
groupingBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion …
Run Code Online (Sandbox Code Playgroud)

c# pipeline task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
813
查看次数

TPL数据流和下游块中的异常处理

我有以下伪代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>( …
Run Code Online (Sandbox Code Playgroud)

c# exception-handling task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
2025
查看次数

使用谓词时 TPL 数据流永远不会完成

我有以下 TPL 数据流,当使用谓词过滤从 TransformBlock 传递到 ActionBlock 的项目时,它永远不会完成。

如果谓词对任何项目返回 false,则数据流挂起。

请有人提供一些关于发生了什么以及如何解决这个问题的见解?

// define blocks 
var getBlock = new TransformBlock<int, int>(i =>
{
    Console.WriteLine($"getBlock: {i}");

    return ++i;
});

var writeBlock = new ActionBlock<int>(i =>
{
    Console.WriteLine($"writeBlock: {i}");
});

// link blocks
getBlock.LinkTo(writeBlock, new DataflowLinkOptions
{
    PropagateCompletion = true
}, i => i == 12); // <-- this predicate prevents the completion of writeBlock

// push to block 
var items = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, …
Run Code Online (Sandbox Code Playgroud)

tpl-dataflow

3
推荐指数
1
解决办法
583
查看次数

FromCurrentSynchronizationContext,我错过了什么吗?

我目前正在处理一个从大型二进制文件读取的应用程序,该文件包含数千个文件,每个文件都由应用程序中的其他类处理.该类返回一个对象或null.我想在主表单上显示进度但由于某种原因我无法理解它.

int TotalFound = 0;
var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext;
BufferBlock<file> buffer = new BufferBlock<file>();
DataflowBlockOptions options = new DataflowBlockOptions(){ TaskScheduler = uiScheduler, }; 
var producer = new ActionBlock<someObject>(largeFile=>{ 
    var file = GetFileFromLargeFile(largeFile);
    if(file !=null){
       TotalFound++;
       buffer.post(file);
       lblProgress.Text = String.Format("{0}", TotalFound);
    }
}, options);
Run Code Online (Sandbox Code Playgroud)

上面的代码冻结了我的Form,即使我使用"TaskScheduler.FromCurrentSynchronizationContext",为什么?因为当我使用下面的代码时,我的表单更新很好

DataflowBlockOptions options = new DataflowBlockOptions(){ TaskScheduler = uiScheduler, }; 
var producer = new ActionBlock<someObject>(largeFile=>{ 
    var file = GetFileFromLargeFile(largeFile);
    if(file !=null){
       Task.Factory.StartNew(() => {
          TotalFound++;
          buffer.Post(file);
       }).ContinueWith(uiTask => {
          lblProgress.Text = String.Format("{0}", TotalFound);
       },CancellationToken.None, TaskContinuationOptions.None, uiScheduler);           
    } …
Run Code Online (Sandbox Code Playgroud)

c# user-interface task-parallel-library tpl-dataflow

2
推荐指数
1
解决办法
3841
查看次数

TPL数据流和异步方法调用

我试图更好地理解并行处理的整个概念并设置测试用例.在使用测试之后,我发现使用Dataflow ActionBlock(或TransformBlock)中的异步方法调用不会对性能产生积极影响,它只会使代码复杂化.我是否正确地假设如果我使用Dataflow Blocks,其中的代码不必是异步的,Dataflow将使其本身异步.或者我错过了这一点?

.net c# task-parallel-library async-await tpl-dataflow

2
推荐指数
1
解决办法
3241
查看次数

TPL数据流:这两个代码片段之间的区别是什么?

我正在研究TPL数据流.Belwo是官方文件Stephen Toub的两段代码片段.TPL数据流简介(TPLDataflow.docx).但我并没有完全了解它们之间的区别.

顺便说一句,这两个代码片段是来自文档的样本,用于演示目的.他们不完整.

  1. 按顺序和同步下载图像

    var downloader = new ActionBlock<string>(url =>
    {
        // Download returns byte[]
        byte [] imageData = Download(url); 
        Process(imageData);
    });
    
    downloader.Post("http://msdn.com/concurrency");
    downloader.Post("http://blogs.msdn.com/pfxteam");
    
    Run Code Online (Sandbox Code Playgroud)
  2. 按顺序和异步方式下载映像

    var downloader = new ActionBlock<string>(async url =>
    {
        byte [] imageData = await DownloadAsync(url);
        Process(imageData);
    });
    
    downloader.Post("http://msdn.com/concurrency ");
    downloader.Post("http://blogs.msdn.com/pfxteam");
    
    Run Code Online (Sandbox Code Playgroud)

.net task-parallel-library async-await c#-5.0 tpl-dataflow

2
推荐指数
1
解决办法
370
查看次数

TPL DataFlow:创建自定义拆分块

在数据流库的帮助下创建自定义拆分块只需要一些帮助,该库是.Net中TPL的一部分.

我想要实现的只是一个简单的自定义块,它接受输入并将其拆分为多个transformblock.这是过滤数据所必需的,我可以在其中记录否定条目并继续使用好的条目.

根据我的需要,它应该足以将输入分成两个不同的输出.类头应该看起来像这样......

public abstract class SplitBlock<TInput, TOutputLeft, TOutputRight>
Run Code Online (Sandbox Code Playgroud)

我的问题是我不知道该怎么做.我所知道的是我需要两个TransformBlocks:

var leftBlock  = new TransformBlock<TInput, TOutputLeft>(...)
var rightblock = new TransformBlock<TInput, TOutputRight>(...)
Run Code Online (Sandbox Code Playgroud)

在我所有的尝试中,我最终有多个ITargetBlocks用于存储左右块的输入,但这不能正确,可以吗?

我很感激你能给予的每一个暗示.

.net task-parallel-library tpl-dataflow

2
推荐指数
1
解决办法
1157
查看次数

动态更改TPL数据流阻止.MaxDegreeOfParallelerism

当我创建一个数据流块时,我指定它的.MaxDegreeOfParallelerism如下:

...New ExecutionDataflowBlockOptions With 
   {.MaxDegreeOfParallelism = System.Environment.ProcessorCount - 1}...
Run Code Online (Sandbox Code Playgroud)

以后有什么方法可以改变吗?

.net c# task-parallel-library tpl-dataflow

2
推荐指数
1
解决办法
455
查看次数