它应该是非常自然的东西,我想知道是否有来自TPL DataFlow库的Prioritized BufferBlock的现成实现?
我正在使用TPL Dataflow处理图像.我收到处理请求,从流中读取图像,应用几个转换,然后将生成的图像写入另一个流:
Request -> Stream -> Image -> Image ... -> Stream
Run Code Online (Sandbox Code Playgroud)
为此,我使用块:
BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>
Run Code Online (Sandbox Code Playgroud)
问题是初始Request是包含创建结果所需的一些数据Stream以及我需要的一些额外信息.我是否必须将原始Request(或其他一些上下文对象)传递给writerBlock所有其他块,如下所示:
TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...
Run Code Online (Sandbox Code Playgroud)
(这很难看),还是有办法将第一个块链接到最后一个块(或者,推广到需要附加数据的块)?
我需要构建TPL数据流管道,它将处理大量消息.因为有很多消息我不能简单地将Post它们放入无限队列中,BufferBlock否则我将面临内存问题.所以我想使用BoundedCapacity = 1选项来禁用队列并使用MaxDegreeOfParallelism并行任务处理,因为我的TransformBlocks可能需要一些时间来处理每条消息.我也用来PropagateCompletion完成所有的完成并且无法沿管道向下传播.
但是我正面临着错误处理的问题,当错误发生在第一条消息之后:调用await SendAsync只是将我的应用程序切换到无限等待.
我已将我的案例简化为示例控制台应用程序:
var data_buffer = new BufferBlock<int> (new DataflowBlockOptions
{
BoundedCapacity = 1
});
var process_block = new ActionBlock<int> (x => { throw new InvalidOperationException (); },
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 1
});
data_buffer.LinkTo (process_block, new DataflowLinkOptions { PropagateCompletion = true });
for (var k = 1; k <= 5; k++)
{
await data_buffer.SendAsync (k);
Console.WriteLine ("Send: {0}", …Run Code Online (Sandbox Code Playgroud) 使用TPL Dataflow库,我想做这样的事情:
myActionBlock.Post(newValue, cancelAllPreviousPosts: true);
Run Code Online (Sandbox Code Playgroud)
似乎ActionBlock上的取消令牌取消了整个事情; 如果我设置了那个,我必须创建一个新的ActionBlock.是否可以使用ActionBlock进行部分取消?
尚未尝试尚未处理的帖子.如果有一些取消令牌可用于检查当前正在执行的帖子,那将是很好的.
c# task-parallel-library cancellationtokensource tpl-dataflow
TPL数据流块具有.InputCount和.OutputCount属性.但是它现在可以执行项目执行,并且没有类似的属性.Busy [Boolean].那么有没有办法知道块现在是否正在运行且其中一个项目仍在那里?

更新:
让我解释一下我的问题.pic上是我目前的Dataflow网络方案.
BufferBlock保存要加载的URL,TransformBlock通过代理服务器加载页面的数量,ActionBlock最后执行加载页面的工作.TransformBlocks已预定义.BoundedCapacity,因此BufferBlock等待任何TransformBlocks变为空闲然后将项目发布到其中.
最初我发布所有网址Buffer Block.此外,如果TransformBlock在加载HTML期间抛出异常之一,则返回它的URL BufferBlock.所以我的目标是等到我的所有URL都被保证加载和解析.现在我等着这样:
Do While _BufferBlock.Count > 0 Or _
GetLoadBlocksTotalInputOutputCount(_TransformBlocks) > 0 Or _
_ActionBlock.InputCount > 0
Await Task.Delay(1000)
Loop
Run Code Online (Sandbox Code Playgroud)
然后我打电话给TransformBlock.Complete他们所有人.但在这种情况下,仍然可以有最后的URL加载它TransformBlock.如果最后一个URL未成功加载,它将变为"丢失",因为TransformBlocks都不会将其取回.这就是为什么我想知道TransformBlocks是否还在运行.抱歉,我的英语不好.

MSDN文档显示该类有一个NameFormat属性DataflowBlockOptions,描述如下:
获取或设置在查询块名称时使用的格式字符串.
那么......你怎么设置这个名字?这个名字怎么样?什么时候使用?
或者......正如我怀疑的那样......这只是一个实际上没有实现的设计残余?
我想知道.如何删除块之间的链接?换一种说法.我想与LinkTo相反.
我想写一个基于tlp数据流的记录器.
我编写了这个接口,并希望在需要时删除ILogListener的订阅.
public interface ILogManager
{
void RemoveListener(ILogListener listener);
}
Run Code Online (Sandbox Code Playgroud) 我需要有一些像BroadcastBlock一样的对象,但保证交付.所以我用了这个问题的答案.但我真的不清楚这里的执行流程.我有一个控制台应用程序.这是我的代码:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library async-await tpl-dataflow
我需要做这样的工作:
由于我需要控制并行处理的页数,我决定使用TPL数据流:
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
____________________________
| Process images |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|____________________________|
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
Run Code Online (Sandbox Code Playgroud)
现在我需要"过程图像"来并行处理图像,但我想限制我在当前工作中所有并行页面上处理的图像数量.
我可以将TrasnformManyBlock用于"过程图像",但如何在"保存页面"块中将它们收回?
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
___________________________________
| Load …Run Code Online (Sandbox Code Playgroud) BoundedCapacity限制是否仅包括等待处理的输入队列中的项目,还是还计算当前正在处理的项目?
让我们举个例子ActionBlock:
var block = new ActionBlock<int>(
i => Console.WriteLine(i),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 1000,
MaxDegreeOfParallelism = 10,
});
Run Code Online (Sandbox Code Playgroud)
如果当前有5个项目并行处理.这是否意味着输入队列可以在这些项目之上容纳1000个项目,或仅仅995个?