当您创建具有有界容量的批处理块并调用triggerBatch时(并行)发布新项目 - 在触发批处理执行时间期间发布新项目将失败.
在输入数据流暂停或减速的情况下,调用触发批处理(每X次)以确保数据在块中不会延迟太长时间.
以下代码将输出一些"post failure"事件.例如:
public static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
batchBlock.LinkTo(actionBlock);
var producerTask = Task.Factory.StartNew(() =>
{
//Post 10K Items
for (int i = 0; i < 10000; i++)
{
var postResult = batchBlock.Post(i);
if (!postResult)
Console.WriteLine("Failed to Post");
}
});
var triggerBatchTask = Task.Factory.StartNew(() =>
{
//Trigger Batch..
for (int i = 0; i < 1000000; i++)
batchBlock.TriggerBatch();
});
producerTask.Wait();
triggerBatchTask.Wait();
}
public static void ProcessBatch(int[] batch)
{
Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
}
Run Code Online (Sandbox Code Playgroud)
*请注意,仅当批处理块有界时,此方案才可重现.
我错过了什么或者是否是batchBlock的问题?
该BatchBlock不会真的拒绝的项目,它试图推迟它。除了在 的情况下Post(),推迟不是一种选择。解决此问题的一种简单方法是使用await batchBlock.SendAsync(i)而不是batchBlock.Post(i)(这也意味着您需要更改Task.Factory.StartNew(() =>为Task.Run(async () =>)。
为什么会发生这种情况?根据源代码,如果BatchBlock有界,TriggerBatch()则异步处理,并且在处理过程中,不会接受任何新项目。
在任何情况下,您都不应该期望Post()总是true在有界块上返回,如果块已满,Post()也将返回false。
| 归档时间: |
|
| 查看次数: |
477 次 |
| 最近记录: |