标签: tpl-dataflow

TPL DataFlow与BlockingCollection

我知道a BlockingCollection最适合消费者/生产者模式.然而,当我使用一个ActionBlockTPL数据流库?

我最初的理解是针对IO操作,保持BlockingCollection同时CPU密集型操作适合于ActionBlock.但我觉得这不是整个故事...任何额外的见解?

.net task-parallel-library data-synchronization tpl-dataflow

6
推荐指数
1
解决办法
3154
查看次数

在TPL数据流中保证交付的BroadcastBlock

我有一个数据流,我以几种不同的方式处理...所以我想发送每个消息的副本我得到多个目标,以便这些目标可以并行执行...但是,我需要设置BoundedCapacity在我的块上,因为数据的流式传输方式比我的目标可以处理的速度快,并且有大量数据.如果没有BoundedCapacity,我会很快耗尽内存.

但是问题是如果目标无法处理它,BroadcastBlock将丢弃消息(由于BoundedCapacity).

我需要的是一个不会丢弃消息的BroadcastBlock,但实质上会拒绝其他输入,直到它可以向每个目标传递消息然后准备好更多.

有这样的东西,还是有人编写了一个以这种方式运行的自定义块?

task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
1487
查看次数

定义了boundedcapacity的贪婪和非贪婪数据流块之间的差异

我有一个BatchBlockBoundedCapacity在其上定义

var _batchBlock = new BatchBlock<int>(2, new GroupingDataflowBlockOptions
                                      {BoundedCapacity = 100 });
Run Code Online (Sandbox Code Playgroud)

因此,如果队列容量达到100,则该块会推迟收到的每条消息,直到某个地点可用.在这种情况下,批处理队列被认为是贪婪还是非贪婪?

.net c# task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
1806
查看次数

我应该如何使用 DataflowBlockOptions.CancellationToken?

我该如何使用DataflowBlockOptions.CancellationToken

BufferBlock如果我创建这样的实例:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

那么使用使用的消费者/生产者方法queue,我如何使用它的 CancellationToken 来处理取消?

例如,在生产者方法中,如何检查取消令牌 - 我没有找到任何属性来访问令牌。

编辑:生产/消费方法示例:

private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        await queue.SendAsync(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}
Run Code Online (Sandbox Code Playgroud)

调用它的代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = …
Run Code Online (Sandbox Code Playgroud)

c# tpl-dataflow

6
推荐指数
1
解决办法
1893
查看次数

取消数据流管道中的特定项目

我正在构建一个Dataflows管道,它的工作就是处理大文件.每个文件都经过解析,分析和渲染; 但是每个文件可能在管道中采用不同的路径,具体取决于它是什么类型的文件.

此管道的用户界面包含要处理的文件列表,以及进度条和每个文件旁边的"取消"按钮(当然,还有一个用于向队列添加新文件的按钮).当用户单击特定文件旁边的"取消"按钮时,我想从管道中删除该文件.

我必须遗漏一些东西,因为我无法弄清楚如何做到这一点.我知道我可以取消整个块,但我不想这样做,我只想取消管道中的单个项目.那么,我错过了什么?

c# task-parallel-library cancellation tpl-dataflow

6
推荐指数
1
解决办法
443
查看次数

使用TPL Dataflow封装以动作块结尾的管道

TPL Dataflow提供了非常有用的功能:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)
Run Code Online (Sandbox Code Playgroud)

使您能够将多个块封装到单个转换块中.它返回一个

IPropagatorBlock<TInput, TOutput>
Run Code Online (Sandbox Code Playgroud)

它代表管道的起始和结束块.

但是,如果我的管道中的最后一个块是ActionBlock,我不能使用它,因为ActionBlock不是SourceBlock,并且函数的返回类型将是ITargetBlock,而不是IPropagatorBlock.

基本上,我正在寻找的是这样的功能:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)
Run Code Online (Sandbox Code Playgroud)

这是一个明智的事情,或者我错过了一些简单的东西?我不太清楚如何写呢-尤其是类设置完成.我需要创建自己的自定义块类型吗?

编辑:

好的,所以看过@Panagiotis Kanavos的回复,做了一些修补,我想出了这个.这基于EncapsulatingPropagator类,它是现有DataflowBlock.Encapsulate方法使用的类:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
1712
查看次数

IO读写操作的TPL Dataflow实现中的内存问题

我试图使用文件IO操作实现读写操作,并将这些操作封装在一起TransformBlock,以使这些操作线程安全,而不是使用锁定机制.

但问题是,当我尝试并行写入5个文件时,有一个异常的内存,并且在使用此实现时它阻止了UI线程.该实现在Windows Phone项目中完成.请说明这个实现有什么问题.

文件IO操作

public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
};

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
};

public static async Task<T> LoadAsync<T>(string fileName)
{
    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName …
Run Code Online (Sandbox Code Playgroud)

c# file-io multithreading dataflow tpl-dataflow

6
推荐指数
1
解决办法
499
查看次数

TransformBlock发布到输出

我的情况是我有一个来自外部源的BufferBlock<Stream>接收Stream,比方说文件系统或一些FTP服务器.这些文件Stream将传递到另一个块并进行处理.

唯一的问题是其中一些文件是压缩的,我想Block在中间添加一个可以在必要时解压缩文件,并Stream为每个条目创建多个输出.

但是我不想使用TransformBlockMany,因为这意味着我必须完全接收ZIP Stream并立即创建输出Stream数组.

我希望这Block能够接收ZIP Stream,开始解压缩,并Push在条目就绪时进入下一个流,因此Process Block可以在第一个文件解压缩后立即开始处理,而不是等到所有内容都解压缩.

我该怎么做呢?

.net tpl-dataflow

6
推荐指数
1
解决办法
240
查看次数

TPL 数据流:将传入集合展平为顺序项目

我正在使用 TPL 数据流构建应用程序。其实我有以下问题。我有一个 transformblock var tfb1 = new TranformBlock<InMsg, IReadOnlyCollection<OutMsg>>。所以tfb1接收消息并创建一个输出消息列表。此输出消息列表应链接到路由器数据块,该数据块OutMsg作为输入接收(而不是IReadOnlyCollection<OutMsg>)。

我怎样才能展平IReadOnlyCollection以便包含的消息可以用作输入,例如TransformBlock<OutMsg, SomeOtherType>. 可以通过LinkTo()吗?

谢谢

c# tpl-dataflow

6
推荐指数
1
解决办法
712
查看次数

如何在 TPL Dataflow 中正确管理 Completion

我创建了类似于网络爬虫的东西来创建我需要管理的 1000 多个 Web 服务的报告。因此,我创建了一个 TPL 数据流管道来管理获取和处理数据。我想象的管道看起来有点像这样(对不起我的绘画技巧:D): 管道

我已经创建了一个实现并且一切正常,直到我开始作为一个整体开始我的管道。我将 500 个对象作为管道的输入提供给管道,并希望程序运行一段时间,但程序在移动到执行块后停止执行。在检查程序的流程后,在我看来,完成快速传播到处置块。我使用相同的管道创建了一个小示例项目,以检查它是我对输入类的实现还是管道本身。示例代码是这样的:

public class Job
{
    public int Ticker { get; set; }

    public Type Type { get; }

    public Job(Type type)
    {
        Type = type;
    }

    public Task Prepare()
    {
        Console.WriteLine("Preparing");
        Ticker = 0;
        return Task.CompletedTask;
    }

    public Task Tick()
    {
        Console.WriteLine("Ticking");
        Ticker++;
        return Task.CompletedTask;
    }

    public bool IsCommitable()
    {
        Console.WriteLine("Trying to commit");
        return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
    }

    public bool IsFinished()
    { …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
1191
查看次数