我喜欢 TPL 数据流。
嗯,一个有趣的设计选择是,大多数预定义块使用委托来允许我们实现处理逻辑。这在简单的场景中看起来不错。但是让我们考虑一下现实世界的大型应用程序,它需要模块化和封装性。我发现使用 DELEGATE 方法编写结构良好的应用程序既困难又不自然。
例如,如果我想要的只是 aMultiplyIntByTwoTransformBlock和 aNoOpActionBlock作为可重用的类 TYPE (而不是实例)。我如何实现它?我希望我可以从TransformBlock/继承ActionBlock并说,覆盖一些Process()方法来实现这一点。但是预定义的块是密封的。他们只接受代表。
我知道我可以从头开始创建一个自定义块,但显然这对我来说太复杂了,因为我需要的只是在预定义块之上进行一些自定义。
那么,我如何实现我的目标?
更新:我并不是说有些事情代表不能做。我是说在模板方法模式中公开抽象块在许多情况下更好。比如说,我希望我可以编写一个 AbstractMultiplyBlock 和 MultiplyByTwoBlock 和 MultiplyByThreeBlock,利用多态性。遗憾的是,代表们没有提供这种数据和逻辑可重用性。
我有一个方法,它Progress<T>作为参数,并在内部报告进展.
因为进度可能在工作线程上快速发生(例如每秒数百次),我需要缓冲它,然后用接收的记录更新视图模型,例如每半秒一次.
在过去我使用过,Observable.FromEventPattern().Buffer(TimeSpan)所以我看到如果我将Progress<T>报告包装到一个事件中,就可以使用相同的Rx机制.然而,这似乎是一种矫枉过正.有更优雅的解决方案吗?
我也看过TPL Dataflow,BufferBlock但我不确定它是否支持基于时间的缓冲,例如每半秒一次.
如果有人有例子,请发布.非常感谢.
此代码永远不会到达最后一行,因为完成不会从 saveBlock 传播到 sendBlock。我究竟做错了什么?
var readGenerateBlock = new TransformBlock<int, int>(n =>
{
Console.WriteLine("Read " + n);
Thread.Sleep(15);
return n;
});
var groupingBlock = new BatchBlock<int>(10);
var saveBlock = new TransformManyBlock<int[], int>(n =>
{
Console.WriteLine("Saving {0} items [{1}; {2}]", n.Count(), n.First(), n.Last());
Thread.Sleep(150);
return n;
});
var sendBlock = new TransformBlock<int, int>(n =>
{
Console.WriteLine("Sending {0}", n);
Thread.Sleep(25);
return n;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
readGenerateBlock.LinkTo(groupingBlock, new DataflowLinkOptions { PropagateCompletion = true });
groupingBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion …Run Code Online (Sandbox Code Playgroud) 我有以下伪代码:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
{
await Task.Delay(500);
Trace.TraceInformation(
$"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
// handling some logic but it throws
if (item >= 5) throw new Exception("Something bad happened");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });
var targets = new List<ITargetBlock<int>> {queue};
var broadcaster = new ActionBlock<int>( …Run Code Online (Sandbox Code Playgroud) 我有以下 TPL 数据流,当使用谓词过滤从 TransformBlock 传递到 ActionBlock 的项目时,它永远不会完成。
如果谓词对任何项目返回 false,则数据流挂起。
请有人提供一些关于发生了什么以及如何解决这个问题的见解?
// define blocks
var getBlock = new TransformBlock<int, int>(i =>
{
Console.WriteLine($"getBlock: {i}");
return ++i;
});
var writeBlock = new ActionBlock<int>(i =>
{
Console.WriteLine($"writeBlock: {i}");
});
// link blocks
getBlock.LinkTo(writeBlock, new DataflowLinkOptions
{
PropagateCompletion = true
}, i => i == 12); // <-- this predicate prevents the completion of writeBlock
// push to block
var items = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, …Run Code Online (Sandbox Code Playgroud) 我目前正在处理一个从大型二进制文件读取的应用程序,该文件包含数千个文件,每个文件都由应用程序中的其他类处理.该类返回一个对象或null.我想在主表单上显示进度但由于某种原因我无法理解它.
int TotalFound = 0;
var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext;
BufferBlock<file> buffer = new BufferBlock<file>();
DataflowBlockOptions options = new DataflowBlockOptions(){ TaskScheduler = uiScheduler, };
var producer = new ActionBlock<someObject>(largeFile=>{
var file = GetFileFromLargeFile(largeFile);
if(file !=null){
TotalFound++;
buffer.post(file);
lblProgress.Text = String.Format("{0}", TotalFound);
}
}, options);
Run Code Online (Sandbox Code Playgroud)
上面的代码冻结了我的Form,即使我使用"TaskScheduler.FromCurrentSynchronizationContext",为什么?因为当我使用下面的代码时,我的表单更新很好
DataflowBlockOptions options = new DataflowBlockOptions(){ TaskScheduler = uiScheduler, };
var producer = new ActionBlock<someObject>(largeFile=>{
var file = GetFileFromLargeFile(largeFile);
if(file !=null){
Task.Factory.StartNew(() => {
TotalFound++;
buffer.Post(file);
}).ContinueWith(uiTask => {
lblProgress.Text = String.Format("{0}", TotalFound);
},CancellationToken.None, TaskContinuationOptions.None, uiScheduler);
} …Run Code Online (Sandbox Code Playgroud) 我试图更好地理解并行处理的整个概念并设置测试用例.在使用测试之后,我发现使用Dataflow ActionBlock(或TransformBlock)中的异步方法调用不会对性能产生积极影响,它只会使代码复杂化.我是否正确地假设如果我使用Dataflow Blocks,其中的代码不必是异步的,Dataflow将使其本身异步.或者我错过了这一点?
我正在研究TPL数据流.Belwo是官方文件Stephen Toub的两段代码片段.TPL数据流简介(TPLDataflow.docx).但我并没有完全了解它们之间的区别.
顺便说一句,这两个代码片段是来自文档的样本,用于演示目的.他们不完整.
按顺序和同步下载图像
var downloader = new ActionBlock<string>(url =>
{
// Download returns byte[]
byte [] imageData = Download(url);
Process(imageData);
});
downloader.Post("http://msdn.com/concurrency");
downloader.Post("http://blogs.msdn.com/pfxteam");
Run Code Online (Sandbox Code Playgroud)按顺序和异步方式下载映像
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
});
downloader.Post("http://msdn.com/concurrency ");
downloader.Post("http://blogs.msdn.com/pfxteam");
Run Code Online (Sandbox Code Playgroud)在数据流库的帮助下创建自定义拆分块只需要一些帮助,该库是.Net中TPL的一部分.
我想要实现的只是一个简单的自定义块,它接受输入并将其拆分为多个transformblock.这是过滤数据所必需的,我可以在其中记录否定条目并继续使用好的条目.
根据我的需要,它应该足以将输入分成两个不同的输出.类头应该看起来像这样......
public abstract class SplitBlock<TInput, TOutputLeft, TOutputRight>
Run Code Online (Sandbox Code Playgroud)
我的问题是我不知道该怎么做.我所知道的是我需要两个TransformBlocks:
var leftBlock = new TransformBlock<TInput, TOutputLeft>(...)
var rightblock = new TransformBlock<TInput, TOutputRight>(...)
Run Code Online (Sandbox Code Playgroud)
在我所有的尝试中,我最终有多个ITargetBlocks用于存储左右块的输入,但这不能正确,可以吗?
我很感激你能给予的每一个暗示.
当我创建一个数据流块时,我指定它的.MaxDegreeOfParallelerism如下:
...New ExecutionDataflowBlockOptions With
{.MaxDegreeOfParallelism = System.Environment.ProcessorCount - 1}...
Run Code Online (Sandbox Code Playgroud)
以后有什么方法可以改变吗?
tpl-dataflow ×10
c# ×7
.net ×4
async-await ×2
.net-4.5 ×1
buffering ×1
c#-5.0 ×1
pipeline ×1
wpf ×1