我试图在TPL Dataflow块中完成"完成".特别是,TransformBlock似乎没有完成.为什么?
我的代码计算从1到1000的所有整数的平方.我使用了a BufferBlock和a TransformBlock.稍后在我的代码中,我等待完成TransformBlock.该块永远不会实际完成,我不明白为什么.
static void Main(string[] args)
{
var bufferBlock = new BufferBlock<int>();
var calculatorBlock = new TransformBlock<int, int>(i =>
{
Console.WriteLine("Calculating {0}²", i);
return (int)Math.Pow(i, 2);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
foreach (var number in Enumerable.Range(1, 1000))
{
bufferBlock.Post(number);
}
bufferBlock.Complete();
// This line never completes
calculatorBlock.Completion.Wait();
// Unreachable code
IList<int> results;
if (calculatorBlock.TryReceiveAll(out results))
{
foreach …Run Code Online (Sandbox Code Playgroud) 我已阅读了许多技术文档,其中包括一些Microsoft团队或其他作者详细介绍了新TPL Dataflow库的功能,async/await并发框架和TPL.但是,我并没有真正遇到任何明确描述使用时间的内容.我知道每个都有自己的位置和适用性,但我特别想知道以下情况:
我有一个完全在进程中运行的数据流模型.在顶部是数据生成组件(A),它生成数据并通过数据流块链接或通过将事件提升到处理组件(B)来传递数据.(B)中的某些部分必须同步运行,而(A)大量受益于并行性,因为大多数进程是I/O或CPU绑定的(从磁盘读取二进制数据,然后对它们进行反序列化和排序).最后,处理组件(B)将转换后的结果传递给(C)以供进一步使用.
我特别想知道何时使用任务,async/await和TPL数据流块以下内容:
启动数据生成组件(A).我显然不想锁定gui /仪表板,因此这个过程必须在某个不同的线程/任务上运行.
如何在(A),(B)和(C)中调用不直接参与数据生成和处理过程但执行可能需要几百毫秒/秒返回的配置工作的方法.我的预感是,这是async/await闪耀的地方?
我最挣扎的是如何最好地设计从一个组件传递到下一个组件的消息.TPL Dataflow看起来非常有趣,但有时候我的目的太慢了.(最后请注意性能问题).如果不使用TPL Dataflow,如何通过进程间任务/并发数据传递实现响应性和并发性?例如,如果我在一个任务中引发一个事件,订阅的事件处理程序在同一个任务中运行而不是传递给另一个任务,那么对吗?总之,组件(A)在将数据传递给组件(B)之后如何进行业务,而组件(B)检索数据并专注于处理数据?这里最好使用哪种并发模型?我在这里实现了数据流块,但这真的是最好的方法吗?
我想总结上面的要点是我在如何使用标准实践设计和实现API类型组件方面的努力?方法是否应设计为异步,数据输入为数据流块,数据输出为数据流块还是事件?一般来说最好的方法是什么?我问,因为上面提到的大多数组件都应该独立工作,因此它们基本上可以在内部交换或独立更改,而无需重新编写访问器和输出.
关于性能的注意事项:我提到TPL Dataflow块有时很慢.我处理高吞吐量,低延迟类型的应用程序和目标磁盘I/O限制,因此tpl数据流块通常比例如同步处理单元执行得慢得多.问题是我不知道如何将进程嵌入到自己的任务或并发模型中,以实现与tpl数据流块已经处理的类似的东西,但没有tpl df带来的开销.
我正在寻找TPL数据流库的.NET 4.0版本.
Nuget包有一个4.0版本的库,但它似乎以.NET 4.5为目标.
我发现了4.0版本的各种引用,就像在这个论坛中一样:
但是提到的链接只是将我重定向到库的Nuget页面.
有谁知道我在哪里可以找到一个针对.NET 4.0的工作版本?
我们假设我有一个简单的 ActionBlock<int>
var actionBlock = new ActionBlock<int>(_ => Console.WriteLine(_));
Run Code Online (Sandbox Code Playgroud)
我可以指定一个有限的容量来启用缓冲:
var actionBlock = new ActionBlock<int>(
_ => Console.WriteLine(_),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 1000
});
Run Code Online (Sandbox Code Playgroud)
创建一个BufferBlock<T>并将其链接到actionBlock更好,它是相同的,还是多余的?
我正在尝试使用实现数据处理管道TPL Dataflow.但是,我对数据流相对较新,并不完全确定如何正确使用它来解决我想要解决的问题.
问题:
我试图遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据.每个文件大致700MB以1GB大小.每个文件都包含JSON数据.为了并行处理这些文件,而不是的运行内存,我试图使用IEnumerable<>与yield return再进一步处理数据.
获得文件列表后,我希望一次最多处理4-5个文件.我的困惑来自:
IEnumerable<>和yeild return使用async/await和数据流.碰上了这个答案由svick,但仍然不知道如何转换IEnumerable<>到ISourceBlock,然后所有块连接在一起,并跟踪完成.producer将非常快(通过文件列表),但consumer将非常慢(处理每个文件 - 读取数据,反序列化JSON).在这种情况下,如何跟踪完成情况.LinkTo数据块的功能来连接各种块吗?或者使用诸如OutputAvailableAsync()和之类的方法ReceiveAsync()将数据从一个块传播到另一个块.代码:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = …Run Code Online (Sandbox Code Playgroud) 我在不同的任务上运行一个非常典型的生产者/消费者模型
Task1:从二进制文件中读取一批byte [],并为每个字节数组集合启动一个新任务.(该操作是为了内存管理目的而批量处理的).
任务2-n:这些是工作任务,每个都在字节数组的传入集合(来自Tasks1)上运行,并对字节数组进行反序列化,按特定条件对它们进行排序,然后存储结果对象的集合(每个字节数组)在"并发字典"中反序列化为此类对象.
任务(n + 1)我选择了并发字典,因为此任务的工作是以与它们来自Task1的顺序相同的顺序合并存储在并发字典中的那些集合.我通过传递一个collectionID(它是int类型并为Task1中的每个新集合递增)从Task1到此任务一直向下实现.此任务基本上检查下一个预期的collectionID是否已存储在并发字典中,如果是,则将其取出,将其添加到Final Queue并检查并发字典中的下一个集合.
现在,从我所看到的和我观看的视频看来,TPL Dataflow可能是这种生产者/消费者模型的完美候选者.我似乎无法设计并因此开始,因为我从未使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务吗?我目前处理250万字节数组,因此在生成的集合中每秒处理对象.TPL Dataflow可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL Dataflow可以在产生工作任务时保留Task1中的集合批次的顺序,并在工作任务完成后重新合并它们吗?它是否优化了什么?在对整个结构进行分析之后,我觉得由于旋转和涉及太多并发集合而浪费了相当多的时间.
任何想法,想法?
c# concurrency producer-consumer task-parallel-library tpl-dataflow
我需要在工作流程中引入重试策略.假设有3个块以这种方式连接:
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);
buffer.LinkTo(processing);
processing.LinkTo(send);
Run Code Online (Sandbox Code Playgroud)
因此,有一个缓冲区累积数据,然后将其发送到转换块,该转换块一次处理不超过3个项目,然后将结果发送到操作块.
在处理过程中可能会出现变换块瞬态错误,如果错误是瞬态错误,我想重试该块.
我知道块通常不可重试(传递到块中的委托可以重试).其中一个选项是包装传递给支持重试的委托.
我也知道有一个非常好的库TransientFaultHandling.Core可以为瞬态故障提供重试机制.这是一个很棒的图书馆,但不是我的情况.如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,则转换块内的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息.想象一下,如果所有3条消息都输入到重试中(假设下次重试尝试将在2分钟内完成)并且失败,则变换块将被卡住,直到至少有一条消息离开变换块.
我看到的唯一的解决办法是延长TranformBlock(实际上,ITargetBlock将有足够太),和做手工重试(比如从这里):
do
{
try { return await transform(input); }
catch
{
if( numRetries <= 0 ) throw;
else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
}
} while( numRetries-- > 0 );
Run Code Online (Sandbox Code Playgroud)
ig将消息再次放入变换块中,但是在这种情况下,重试上下文(剩余的重试次数等)也应该传递给该块.听起来太复杂了......
有没有人看到更简单的方法来实现工作流程块的重试策略?
我正在为.NET建立一个AKKA框架的端口(现在不要太认真了,现在这是一个周末黑客的演员部分)
我对其中的"未来"支持存在一些问题.在Java/Scala Akka中,期货将与Await电话同步等待.很像.NET Task.Wait()
我的目标是支持真正的异步等待.它现在可以工作,但是在我当前的解决方案中,继续在错误的线程上执行.
这是将消息传递给我的一个包含未来await块的actor的结果.如您所见,actor总是在同一个线程上执行,而await块在随机线程池线程上执行.
actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
Run Code Online (Sandbox Code Playgroud)
actor使用DataFlow获取消息BufferBlock<Message>
或者更确切地说,我在缓冲区块上使用RX来订阅消息.它配置如下:
var messages = new BufferBlock<Message>()
{
BoundedCapacity = 100,
TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
Run Code Online (Sandbox Code Playgroud)
到现在为止还挺好.
但是,当我等待未来的结果时.像这样:
protected override void OnReceive(IMessage message)
{
....
var result = await Ask(logger, m);
// This is not executed on the same thread as the above code
result.Match()
.With<SomeMessage>(t => {
Console.WriteLine("await thread {0}", …Run Code Online (Sandbox Code Playgroud) 我尝试使用系统资源的最佳使用来创建设计良好的TPL数据流管道.我的项目是一个HTML解析器,它将解析后的值添加到SQL Server DB中.我已经拥有了我未来管道的所有方法,现在我的问题是将它们放在Dataflow块中的最佳方法是什么,以及我应该使用多少块?一些方法是CPU绑定的,其中一些方法 - I/O绑定(从Internet加载,SQL Server数据库查询).现在我认为将每个I/O操作放在单独的块中就像这个方案一样正确:

在这种情况下设计管道的基本规则是什么?
我正在设计一个由多个块组成的长时间运行的Dataflow管道.项目被输入到管道的输入块,最终通过它,并在最后的UI中显示(作为对用户的礼貌 - 管道的真正工作是将处理结果保存到磁盘).
由于各种原因(输入错误,网络故障,计算错误等),管道块内的lambda函数可能会抛出异常.在这种情况下,我不想破坏整个管道,而是想要触发违规项目,并在"错误"下的UI中显示它.
最好的方法是什么?我知道我可以在try/catch中包装每个lambda函数:
var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...)
var workerBlock = new TransformBlock<WorkItem, WorkItem>(item =>
{
try {
return DoStuff(item);
} catch (Exception ex) {
errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
但是我在管道中有大约10个块,并且将代码复制/粘贴到每个块中似乎很愚蠢.此外,我不喜欢返回null的想法,因为现在所有下游块都必须检查它.
我的下一个最好的想法是创建一个函数,返回一个为我做包装的lambda:
private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem
{
return arg =>
{
try {
return f(arg);
} catch (Exception ex) {
errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
return default(TResult);
}
};
}
Run Code Online (Sandbox Code Playgroud)
但这似乎有点过分.有没有更好的办法 ?
tpl-dataflow ×10
c# ×7
.net ×4
async-await ×3
.net-4.0 ×1
akka ×1
concurrency ×1
ienumerable ×1
nuget ×1
pipeline ×1
yield-return ×1