标签: tpl-dataflow

优先化的TPL DataFlow BufferBlock

它应该是非常自然的东西,我想知道是否有来自TPL DataFlow库的Prioritized BufferBlock的现成实现?

priority-queue task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
1324
查看次数

下游的TPL数据流块如何获取源生成的数据?

我正在使用TPL Dataflow处理图像.我收到处理请求,从流中读取图像,应用几个转换,然后将生成的图像写入另一个流:

Request -> Stream -> Image -> Image ... -> Stream
Run Code Online (Sandbox Code Playgroud)

为此,我使用块:

BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>
Run Code Online (Sandbox Code Playgroud)

问题是初始Request是包含创建结果所需的一些数据Stream以及我需要的一些额外信息.我是否必须将原始Request(或其他一些上下文对象)传递给writerBlock所有其他块,如下所示:

TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...
Run Code Online (Sandbox Code Playgroud)

(这很难看),还是有办法将第一个块链接到最后一个块(或者,推广到需要附加数据的块)?

.net c# task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
2362
查看次数

具有有界容量的变换块中的TPL数据流异常

我需要构建TPL数据流管道,它将处理大量消息.因为有很多消息我不能简单地将Post它们放入无限队列中,BufferBlock否则我将面临内存问题.所以我想使用BoundedCapacity = 1选项来禁用队列并使用MaxDegreeOfParallelism并行任务处理,因为我的TransformBlocks可能需要一些时间来处理每条消息.我也用来PropagateCompletion完成所有的完成并且无法沿管道向下传播.

但是我正面临着错误处理的问题,当错误发生在第一条消息之后:调用await SendAsync只是将我的应用程序切换到无限等待.

我已将我的案例简化为示例控制台应用程序:

var data_buffer = new BufferBlock<int> (new DataflowBlockOptions
                                        {
                                            BoundedCapacity = 1
                                        });

var process_block = new ActionBlock<int> (x => { throw new InvalidOperationException (); },
                                            new ExecutionDataflowBlockOptions
                                            {
                                                MaxDegreeOfParallelism = 2,
                                                BoundedCapacity = 1
                                            });

data_buffer.LinkTo (process_block, new DataflowLinkOptions { PropagateCompletion = true });


for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync (k);
    Console.WriteLine ("Send: {0}", …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
2825
查看次数

使用TPL Dataflow,我可以取消所有帖子然后添加一个吗?

使用TPL Dataflow库,我想做这样的事情:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);
Run Code Online (Sandbox Code Playgroud)

似乎ActionBlock上的取消令牌取消了整个事情; 如果我设置了那个,我必须创建一个新的ActionBlock.是否可以使用ActionBlock进行部分取消?

尚未尝试尚未处理的帖子.如果有一些取消令牌可用于检查当前正在执行的帖子,那将是很好的.

c# task-parallel-library cancellationtokensource tpl-dataflow

7
推荐指数
1
解决办法
1101
查看次数

如何知道TPL Dataflow Block是否忙?

TPL数据流块具有.InputCount.OutputCount属性.但是它现在可以执行项目执行,并且没有类似的属性.Busy [Boolean].那么有没有办法知道块现在是否正在运行且其中一个项目仍在那里?

在此输入图像描述

更新:

让我解释一下我的问题.pic上是我目前的Dataflow网络方案. BufferBlock保存要加载的URL,TransformBlock通过代理服务器加载页面的数量,ActionBlock最后执行加载页面的工作.TransformBlocks已预定义.BoundedCapacity,因此BufferBlock等待任何TransformBlocks变为空闲然后将项目发布到其中.

最初我发布所有网址Buffer Block.此外,如果TransformBlock在加载HTML期间抛出异常之一,则返回它的URL BufferBlock.所以我的目标是等到我的所有URL都被保证加载和解析.现在我等着这样:

Do While _BufferBlock.Count > 0 Or _ 
         GetLoadBlocksTotalInputOutputCount(_TransformBlocks) > 0 Or _ 
         _ActionBlock.InputCount > 0

        Await Task.Delay(1000)
Loop
Run Code Online (Sandbox Code Playgroud)

然后我打电话给TransformBlock.Complete他们所有人.但在这种情况下,仍然可以有最后的URL加载它TransformBlock.如果最后一个URL未成功加载,它将变为"丢失",因为TransformBlocks都不会将其取回.这就是为什么我想知道TransformBlocks是否还在运行.抱歉,我的英语不好.

在此输入图像描述

.net c# block task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
1572
查看次数

如何在TPL Dataflow中设置/获取/使用块的名称?

MSDN文档显示该类有一个NameFormat属性DataflowBlockOptions,描述如下:

获取或设置在查询块名称时使用的格式字符串.

那么......你怎么设置这个名字?这个名字怎么样?什么时候使用?

或者......正如我怀疑的那样......这只是一个实际上没有实现的设计残余?

.net c# task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
447
查看次数

TPL Dataflow如何删除块之间的链接

我想知道.如何删除块之间的链接?换一种说法.我想与LinkTo相反.

我想写一个基于tlp数据流的记录器.

我编写了这个接口,并希望在需要时删除ILogListener的订阅.

public interface ILogManager
{
    void RemoveListener(ILogListener listener);
}
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
588
查看次数

替代Dataflow BroadcastBlock,保证交付

我需要有一些像BroadcastBlock一样的对象,但保证交付.所以我用了这个问题的答案.但我真的不清楚这里的执行流程.我有一个控制台应用程序.这是我的代码:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library async-await tpl-dataflow

7
推荐指数
1
解决办法
1569
查看次数

将数据流拆分为小作业,然后再进行分组

我需要做这样的工作:

  1. 从数据库中获取Page对象
  2. 为每个页面获取所有图像并处理它们(IO绑定,例如,上传到CDN)
  3. 如果所有图像都成功完成,则将页面标记为在数据库中处理

由于我需要控制并行处理的页数,我决定使用TPL数据流:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|
Run Code Online (Sandbox Code Playgroud)

现在我需要"过程图像"来并行处理图像,但我想限制我在当前工作中所有并行页面上处理的图像数量.

我可以将TrasnformManyBlock用于"过程图像",但如何在"保存页面"块中将它们收回?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
1357
查看次数

BoundedCapacity是否包含当前正在TPL数据流中处理的项目?

BoundedCapacity限制是否仅包括等待处理的输入队列中的项目,还是还计算当前正在处理的项目?

让我们举个例子ActionBlock:

var block = new ActionBlock<int>(
    i => Console.WriteLine(i),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = 10,
    });
Run Code Online (Sandbox Code Playgroud)

如果当前有5个项目并行处理.这是否意味着输入队列可以这些项目之上容纳1000个项目,或仅仅995个?

.net c# task-parallel-library tpl-dataflow

7
推荐指数
1
解决办法
1393
查看次数