woj*_*rak 4 c# task-parallel-library tpl-dataflow
我有一个由多个块组成的数据流管道。当元素流经我的处理管道时,我想按 field 对它们进行分组A。为此,我有一个BatchBlock高BoundedCapacity. 我在其中存储我的元素,直到我决定应该释放它们。所以我调用TriggerBatch()方法。
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
Run Code Online (Sandbox Code Playgroud)
这就是它的样子。问题是,生成的批次有时包含下一个发布的元素,该元素不应该在那里。
为了显示:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
Run Code Online (Sandbox Code Playgroud)
在这一点上,我希望我的批次是{A,A,A},但它是{A,A,A,B}
LikeTriggerBatch()是异步的,SendAsync实际上是在实际批处理之前执行的。
我该如何解决这个问题?我显然不想放在Task.Wait(x)那里(我尝试过,它有效,但当然性能很差)。
我也遇到过这个问题,尝试TriggerBatch在错误的地方调用。如前所述,使用的 SlidingWindow 示例DataflowBlock.Encapsulate就是这里的答案,但需要一些时间来适应,所以我想我应该分享我完成的块。
我ConditionalBatchBlock创建的批次达到最大大小,如果满足特定条件,可能会更快。在我的特定场景中,我需要创建 100 个批次,但在检测到数据中的某些更改时始终创建新批次。
public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
var queue = new Queue<T>();
var source = new BufferBlock<T[]>();
var target = new ActionBlock<T>(async item =>
{
// start a new batch if required by the condition
if (condition(queue, item))
{
await source.SendAsync(queue.ToArray());
queue.Clear();
}
queue.Enqueue(item);
// always send a batch when the max size has been reached
if (queue.Count == batchSize)
{
await source.SendAsync(queue.ToArray());
queue.Clear();
}
});
// send any remaining items
target.Completion.ContinueWith(async t =>
{
if (queue.Any())
await source.SendAsync(queue.ToArray());
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
Run Code Online (Sandbox Code Playgroud)
对于您的情况,该condition参数可能更简单。我需要查看队列以及当前项目来确定是否创建新批次。
我这样使用它:
public async Task RunExampleAsync<T>()
{
var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));
var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));
conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
await ReadDataAsync<T>(conditionalBatchBlock);
await actionBlock.Completion;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1919 次 |
| 最近记录: |