标签: tpl-dataflow

TransformBlock永远不会完成

我试图在TPL Dataflow块中完成"完成".特别是,TransformBlock似乎没有完成.为什么?

示例程序

我的代码计算从1到1000的所有整数的平方.我使用了a BufferBlock和a TransformBlock.稍后在我的代码中,我等待完成TransformBlock.该块永远不会实际完成,我不明白为什么.

static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

    using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            bufferBlock.Post(number);
        }

        bufferBlock.Complete();

        // This line never completes
        calculatorBlock.Completion.Wait();

        // Unreachable code
        IList<int> results;
        if (calculatorBlock.TryReceiveAll(out results))
        {
            foreach …
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

12
推荐指数
2
解决办法
3226
查看次数

任务与TPL数据流与异步/等待,何时使用?

我已阅读了许多技术文档,其中包括一些Microsoft团队或其他作者详细介绍了新TPL Dataflow库的功能,async/await并发框架和TPL.但是,我并没有真正遇到任何明确描述使用时间的内容.我知道每个都有自己的位置和适用性,但我特别想知道以下情况:

我有一个完全在进程中运行的数据流模型.在顶部是数据生成组件(A),它生成数据并通过数据流块链接或通过将事件提升到处理组件(B)来传递数据.(B)中的某些部分必须同步运行,而(A)大量受益于并行性,因为大多数进程是I/O或CPU绑定的(从磁盘读取二进制数据,然后对它们进行反序列化和排序).最后,处理组件(B)将转换后的结果传递给(C)以供进一步使用.

我特别想知道何时使用任务,async/await和TPL数据流块以下内容:

  • 启动数据生成组件(A).我显然不想锁定gui /仪表板,因此这个过程必须在某个不同的线程/任务上运行.

  • 如何在(A),(B)和(C)中调用不直接参与数据生成和处理过程但执行可能需要几百毫秒/秒返回的配置工作的方法.我的预感是,这是async/await闪耀的地方?

  • 我最挣扎的是如何最好地设计从一个组件传递到下一个组件的消息.TPL Dataflow看起来非常有趣,但有时候我的目的太慢了.(最后请注意性能问题).如果不使用TPL Dataflow,如何通过进程间任务/并发数据传递实现响应性和并发性?例如,如果我在一个任务中引发一个事件,订阅的事件处理程序在同一个任务中运行而不是传递给另一个任务,那么对吗?总之,组件(A)在将数据传递给组件(B)之后如何进行业务,而组件(B)检索数据并专注于处理数据?这里最好使用哪种并发模型?我在这里实现了数据流块,但这真的是最好的方法吗?

  • 我想总结上面的要点是我在如何使用标准实践设计和实现API类型组件方面的努力?方法是否应设计为异步,数据输入为数据流块,数据输出为数据流块还是事件?一般来说最好的方法是什么?我问,因为上面提到的大多数组件都应该独立工作,因此它们基本上可以在内部交换或独立更改,而无需重新编写访问器和输出.

关于性能的注意事项:我提到TPL Dataflow块有时很慢.我处理高吞吐量,低延迟类型的应用程序和目标磁盘I/O限制,因此tpl数据流块通常比例如同步处理单元执行得慢得多.问题是我不知道如何将进程嵌入到自己的任务或并发模型中,以实现与tpl数据流块已经处理的类似的东西,但没有tpl df带来的开销.

task-parallel-library async-await tpl-dataflow

11
推荐指数
1
解决办法
3636
查看次数

哪里可以找到4.0的TPL数据流版本?

我正在寻找TPL数据流库的.NET 4.0版本.

Nuget包有一个4.0版本的库,但它似乎以.NET 4.5为目标.

我发现了4.0版本的各种引用,就像在这个论坛中一样:

http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/6206c714-6dee-4d17-a880-26d0c137a167

但是提到的链接只是将我重定向到库的Nuget页面.

有谁知道我在哪里可以找到一个针对.NET 4.0的工作版本?

.net .net-4.0 task-parallel-library nuget tpl-dataflow

11
推荐指数
1
解决办法
4323
查看次数

DataflowBlockOptions.BoundedCapacity和BufferBlock <T>之间的区别

我们假设我有一个简单的 ActionBlock<int>

var actionBlock = new ActionBlock<int>(_ => Console.WriteLine(_));
Run Code Online (Sandbox Code Playgroud)

我可以指定一个有限的容量来启用缓冲:

var actionBlock = new ActionBlock<int>(
    _ => Console.WriteLine(_),
    new ExecutionDataflowBlockOptions
    { 
        BoundedCapacity = 1000
    });
Run Code Online (Sandbox Code Playgroud)

创建一个BufferBlock<T>并将其链接到actionBlock更好,它是相同的,还是多余的?

.net c# task-parallel-library tpl-dataflow

11
推荐指数
1
解决办法
1871
查看次数

使用async/await并使用TPL Dataflow返回yield

我正在尝试使用实现数据处理管道TPL Dataflow.但是,我对数据流相对较新,并不完全确定如何正确使用它来解决我想要解决的问题.

问题:

我试图遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据.每个文件大致700MB1GB大小.每个文件都包含JSON数据.为了并行处理这些文件,而不是的运行内存,我试图使用IEnumerable<>yield return再进一步处理数据.

获得文件列表后,我希望一次最多处理4-5个文件.我的困惑来自:

  • 如何使用IEnumerable<>yeild return使用async/await和数据流.碰上了这个答案svick,但仍然不知道如何转换IEnumerable<>ISourceBlock,然后所有块连接在一起,并跟踪完成.
  • 在我的情况下,producer将非常快(通过文件列表),但consumer将非常慢(处理每个文件 - 读取数据,反序列化JSON).在这种情况下,如何跟踪完成情况.
  • 我应该使用LinkTo数据块的功能来连接各种块吗?或者使用诸如OutputAvailableAsync()和之类的方法ReceiveAsync()将数据从一个块传播到另一个块.

代码:

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = …
Run Code Online (Sandbox Code Playgroud)

c# ienumerable yield-return async-await tpl-dataflow

11
推荐指数
2
解决办法
3851
查看次数

这是TPL Dataflow的工作吗?

我在不同的任务上运行一个非常典型的生产者/消费者模型

Task1:从二进制文件中读取一批byte [],并为每个字节数组集合启动一个新任务.(该操作是为了内存管理目的而批量处理的).

任务2-n:这些是工作任务,每个都在字节数组的传入集合(来自Tasks1)上运行,并对字节数组进行反序列化,按特定条件对它们进行排序,然后存储结果对象的集合(每个字节数组)在"并发字典"中反序列化为此类对象.

任务(n + 1)我选择了并发字典,因为此任务的工作是以与它们来自Task1的顺序相同的顺序合并存储在并发字典中的那些集合.我通过传递一个collectionID(它是int类型并为Task1中的每个新集合递增)从Task1到此任务一直向下实现.此任务基本上检查下一个预期的collectionID是否已存储在并发字典中,如果是,则将其取出,将其添加到Final Queue并检查并发字典中的下一个集合.

现在,从我所看到的和我观看的视频看来,TPL Dataflow可能是这种生产者/消费者模型的完美候选者.我似乎无法设计并因此开始,因为我从未使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务吗?我目前处理250万字节数组,因此在生成的集合中每秒处理对象.TPL Dataflow可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL Dataflow可以在产生工作任务时保留Task1中的集合批次的顺序,并在工作任务完成后重新合并它们吗?它是否优化了什么?在对整个结构进行分析之后,我觉得由于旋转和涉及太多并发集合而浪费了相当多的时间.

任何想法,想法?

c# concurrency producer-consumer task-parallel-library tpl-dataflow

10
推荐指数
1
解决办法
2352
查看次数

在ITargetBlock <TInput>中重试策略

我需要在工作流程中引入重试策略.假设有3个块以这种方式连接:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);
Run Code Online (Sandbox Code Playgroud)

因此,有一个缓冲区累积数据,然后将其发送到转换块,该转换块一次处理不超过3个项目,然后将结果发送到操作块.

在处理过程中可能会出现变换块瞬态错误,如果错误是瞬态错误,我想重试该块.

我知道块通常不可重试(传递到块中的委托可以重试).其中一个选项是包装传递给支持重试的委托.

我也知道有一个非常好的库TransientFaultHandling.Core可以为瞬态故障提供重试机制.这是一个很棒的图书馆,但不是我的情况.如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,则转换块内的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息.想象一下,如果所有3条消息都输入到重试中(假设下次重试尝试将在2分钟内完成)并且失败,则变换块将被卡住,直到至少有一条消息离开变换块.

我看到的唯一的解决办法是延长TranformBlock(实际上,ITargetBlock将有足够太),和做手工重试(比如从这里):

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );
Run Code Online (Sandbox Code Playgroud)

ig将消息再次放入变换块中,但是在这种情况下,重试上下文(剩余的重试次数等)也应该传递给该块.听起来太复杂了......

有没有人看到更简单的方法来实现工作流程块的重试策略?

c# task-parallel-library tpl-dataflow

10
推荐指数
1
解决办法
1228
查看次数

强制任务继续当前线程?

我正在为.NET建立一个AKKA框架的端口(现在不要太认真了,现在这是一个周末黑客的演员部分)

我对其中的"未来"支持存在一些问题.在Java/Scala Akka中,期货将与Await电话同步等待.很像.NET Task.Wait()

我的目标是支持真正的异步等待.它现在可以工作,但是在我当前的解决方案中,继续在错误的线程上执行.

这是将消息传递给我的一个包含未来await块的actor的结果.如您所见,actor总是在同一个线程上执行,而await块在随机线程池线程上执行.

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
Run Code Online (Sandbox Code Playgroud)

actor使用DataFlow获取消息BufferBlock<Message> 或者更确切地说,我在缓冲区块上使用RX来订阅消息.它配置如下:

var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好.

但是,当我等待未来的结果时.像这样:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}", …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive akka async-await tpl-dataflow

10
推荐指数
1
解决办法
1971
查看次数

TPL Dataflow管道设计基础知识

我尝试使用系统资源的最佳使用来创建设计良好的TPL数据流管道.我的项目是一个HTML解析器,它将解析后的值添加到SQL Server DB中.我已经拥有了我未来管道的所有方法,现在我的问题是将它们放在Dataflow块中的最佳方法是什么,以及我应该使用多少块?一些方法是CPU绑定的,其中一些方法 - I/O绑定(从Internet加载,SQL Server数据库查询).现在我认为将每个I/O操作放在单独的块中就像这个方案一样正确: TPL数据流管道

在这种情况下设计管道的基本规则是什么?

.net pipeline task-parallel-library tpl-dataflow

10
推荐指数
1
解决办法
944
查看次数

Dataflow管道中的全局每块错误处理

我正在设计一个由多个块组成的长时间运行的Dataflow管道.项目被输入到管道的输入块,最终通过它,并在最后的UI中显示(作为对用户的礼貌 - 管道的真正工作是将处理结果保存到磁盘).

由于各种原因(输入错误,网络故障,计算错误等),管道块内的lambda函数可能会抛出异常.在这种情况下,我不想破坏整个管道,而是想要触发违规项目,并在"错误"下的UI中显示它.

最好的方法是什么?我知道我可以在try/catch中包装每个lambda函数:

var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...)

var workerBlock = new TransformBlock<WorkItem, WorkItem>(item => 
{
    try {
        return DoStuff(item);
    } catch (Exception ex) {
        errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
        return null;
    }
}
Run Code Online (Sandbox Code Playgroud)

但是我在管道中有大约10个块,并且将代码复制/粘贴到每个块中似乎很愚蠢.此外,我不喜欢返回null的想法,因为现在所有下游块都必须检查它.

我的下一个最好的想法是创建一个函数,返回一个为我做包装的lambda:

  private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem
  {
     return arg =>
     {
        try {
           return f(arg);
        } catch (Exception ex) {
           errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
           return default(TResult);
        }
     };
  }
Run Code Online (Sandbox Code Playgroud)

但这似乎有点过分.有没有更好的办法 ?

c# error-handling tpl-dataflow

10
推荐指数
1
解决办法
628
查看次数