标签: tpl-dataflow

在ActionBlock中等待异步lambda

我有一个带有ActionBlock的类Receiver:

public class Receiver<T> : IReceiver<T>
{

  private ActionBlock<T> _receiver;

  public Task<bool> Send(T item) 
  {
     if(_receiver!=null)
        return _receiver.SendAsync(item);

     //Do some other stuff her
  }

  public void Register (Func<T, Task> receiver)
  {
    _receiver = new ActionBlock<T> (receiver);
  }

  //...
}
Run Code Online (Sandbox Code Playgroud)

ActionBlock的Register-Action是一个带有await-Statement的async-Method:

private static async Task Writer(int num)
{
   Console.WriteLine("start " + num);
   await Task.Delay(500);
   Console.WriteLine("end " + num);
}
Run Code Online (Sandbox Code Playgroud)

现在我想做的是同步等待(如果设置了条件),直到action方法完成以获得独占行为:

var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!
Run Code Online (Sandbox Code Playgroud)

问题是"await Task.Delay(500);" 语句被执行,"receiver.Post(5).Wait();" 不再等了.

我尝试了几种变体(TaskCompletionSource,ContinueWith,...),但它不起作用.

有谁知道如何解决这个问题?

c# asynchronous task-parallel-library async-await tpl-dataflow

6
推荐指数
1
解决办法
2969
查看次数

散列/分片ActionBlocks

我有一些不断流动的某些项目,我需要并行处理,所以我正在使用TPL Dataflow.问题在于,共享相同密钥(类似于字典)的项目应按FIFO顺序处理,而不是彼此平行(它们可以与具有不同值的其他项目并行).

正在完成的工作是CPU绑定最小的异步锁,所以我的解决方案是创建一个ActionBlock<T>大小为s的数组,Environment.ProcessorCount没有并行性,并根据键的GetHashCode值发布到它们.

创建:

_actionBlocks = new ActionBlock<Item>[Environment.ProcessorCount];
for (int i = 0; i < _actionBlocks.Length; i++)
{
    _actionBlocks[i] = new ActionBlock<Item>(_ => ProcessItemAsync(_));
}
Run Code Online (Sandbox Code Playgroud)

用法:

bool ProcessItem(Key key, Item item)
{
    var actionBlock = _actionBlocks[(uint)key.GetHashCode() % _actionBlocks.Length];
    return actionBlock.Post(item);
}
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是,这是我问题的最佳解决方案吗?我是否会损害性能/可扩展性?我错过了什么吗?

.net c# task-parallel-library async-await tpl-dataflow

6
推荐指数
1
解决办法
476
查看次数

TPL DataFlow与BlockingCollection

我知道a BlockingCollection最适合消费者/生产者模式.然而,当我使用一个ActionBlockTPL数据流库?

我最初的理解是针对IO操作,保持BlockingCollection同时CPU密集型操作适合于ActionBlock.但我觉得这不是整个故事...任何额外的见解?

.net task-parallel-library data-synchronization tpl-dataflow

6
推荐指数
1
解决办法
3154
查看次数

使用TPL Dataflow封装以动作块结尾的管道

TPL Dataflow提供了非常有用的功能:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)
Run Code Online (Sandbox Code Playgroud)

使您能够将多个块封装到单个转换块中.它返回一个

IPropagatorBlock<TInput, TOutput>
Run Code Online (Sandbox Code Playgroud)

它代表管道的起始和结束块.

但是,如果我的管道中的最后一个块是ActionBlock,我不能使用它,因为ActionBlock不是SourceBlock,并且函数的返回类型将是ITargetBlock,而不是IPropagatorBlock.

基本上,我正在寻找的是这样的功能:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)
Run Code Online (Sandbox Code Playgroud)

这是一个明智的事情,或者我错过了一些简单的东西?我不太清楚如何写呢-尤其是类设置完成.我需要创建自己的自定义块类型吗?

编辑:

好的,所以看过@Panagiotis Kanavos的回复,做了一些修补,我想出了这个.这基于EncapsulatingPropagator类,它是现有DataflowBlock.Encapsulate方法使用的类:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
1712
查看次数

当消费者不堪重负时,如何让快速生产者暂停?

我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式.我有大数据流网格,其中有大约40个块.网格中有两个主要功能部分:生产者部分和消费者部分.生产者应该继续为消费者提供大量工作,而消费者有时会缓慢地处理传入的工作.当消费者忙于一些指定数量的工作项时,我想暂停生产者.否则,该应用程序会占用大量内存/ CPU,并且行为不可持续.

我制作了演示应用程序以演示此问题:

啮合

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false
            };

            var boundedOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false,
                BoundedCapacity = 5
            };

            var bufferBlock = new BufferBlock<int>(boundedOptions);
            var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
            var broadcastBlock = new BroadcastBlock<int>(x => x, options);

            var consumerBlock = new …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
806
查看次数

TPL Dataflow - Parallel&Async processing, while keeping order

I created a TPL Dataflow pipeline which consist of 3 TransformBlock's and an ActionBlock at the end.

var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode …
Run Code Online (Sandbox Code Playgroud)

c# asynchronous task-parallel-library tpl-dataflow

6
推荐指数
3
解决办法
1544
查看次数

TPL 数据流,JoinBlock 限制的替代方案?

我寻找 JoinBlock 的替代方案,它可以通过 n-TransformBlocks 链接,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类的集合传递给另一个数据流块。

JoinBlock 可以很好地完成工作,但仅限于连接 3 个源块。它还存在许多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让任务从 TransformBlocks 返回并等到所有 TransformBlocks 都有一个完成的任务才能接受Task<item>

任何替代想法?我可能有 1-20 个这样的转换块,在传递连接的项目集合之前,我需要将哪些项目连接在一起。每个转换块都保证为每个“转换”的输入项准确返回一个输出项。

编辑:要求澄清:

根据我之前的一个问题,我按如下方式设置了我的 JoinBlock:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking …
Run Code Online (Sandbox Code Playgroud)

c# concurrency actor-model task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
2681
查看次数

TPL 数据流:在保持秩序的同时设计并行性

我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:

  1. 在框架上绘制一些文本/位图
  2. 裁剪框架
  3. 调整框架大小
  4. 将图像减少到 256 色

显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。

所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。

c# asynchronous dataflow task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1374
查看次数

如何标记一个 TPL 数据流周期完成?

鉴于 TPL 数据流中的以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    })); …
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
731
查看次数

TPL-定义的ExecutionDataflowBlockOptions BoundedCapacity降低了性能

有什么方法可以通过TPL节流来限制性能下降吗?

我有一个复杂的组件管道,并试图限制所需的内存需求。我从多个文件中并行读取,管道中的组件可能会从这些文件的随机部分中读取一些内容,其余组件则进行CPU绑定操作。

我使用通用测试方法将性能测试平台简化为这些测试。

private void TPLPerformaceTest(int generateNumbers, ExecutionDataflowBlockOptions transformBlockOptions)
{
    var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);

    var storedCount = 0;
    var generatedCount = 0;
    var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));

    transformBlock.LinkTo(store);
    transformBlock.Completion.ContinueWith(_ => store.Complete());

    for (int i = 0; i < generateNumbers; i++)
    {
        transformBlock.SendAsync(i).Wait(); //To ensure delivery
        Interlocked.Increment(ref generatedCount);
    }
    transformBlock.Complete();
    store.Completion.Wait();

    Assert.IsTrue(generatedCount == generateNumbers);
    Assert.IsTrue(storedCount == generateNumbers);
}
Run Code Online (Sandbox Code Playgroud)

第一个没有节流。在我的CPU上,大约需要12秒钟才能完成,消耗约800MB的RAM,平均CPU利用率约为35%

[Test]
public void TPLPerformaceUnlimitedTest()
{
    var …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing performance performance-testing tpl-dataflow

5
推荐指数
1
解决办法
908
查看次数