我有一个链接到基于谓词的动作块的转换块。
// Blocks
public TransformBlock<Document, Document> DocumentCreationTransformBlock =
new TransformBlock<Document, Document>(async document =>
{
return await CreateAsync(document); // REST API call that sets document.NewId
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100,
MaxDegreeOfParallelism = 20
});
public ActionBlock<Document> SplitPipelineActionBlock =
new ActionBlock<Document>(async document =>
{ // implementation obfuscated
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100
};
// Shared block elements
public DataflowLinkOptions CommonLinkOptions = new DataflowLinkOptions {
PropagateCompletion = true };
// Link mesh
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
document => !string.IsNullOrEmpty(document.NewId));
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions);
Run Code Online (Sandbox Code Playgroud)
转换块尝试通过 REST API 创建文档。它应该Document用NewId. 因此 LinkTo 谓词检查返回的 Document 是否具有 NewId。
对于不符合此条件的任何对象,有一个NullTarget块可以清空TransformBlock.
在我的测试中,我向管道发布了 10,100 个项目,并确认所有项目都NewId成功返回。但是,有 130 个项目被传递到NullTarget. 当我再次在整个集合上重新运行程序时,有 3000 多个项目被传递到NullTarget. 即使是之前成功存储了NewId.
我怀疑存在SplitPipelineActionBlock BoundedCapacity已满的问题,并且LinkTo只是忽略谓词然后传递要由下一个 LinkTo 处理的项目,即NullTarget.
我怎样才能让所有项目都有机会被发送到SplitPipeLineAction块?
当源块中的项目可用时,它会一次一个地将其提供给其链接;如果任何链接未收到该项目,则将该项目提供给下一个链接。没有区分为什么不使用链接。块确实将“可能稍后”回复与“否”回复(PostponedvsDeclined)区分开来,但在任何一种情况下,都会尝试下一个链接以获取现在可以接受的链接块。
解决此问题的最佳选择是向空块链接添加一个谓词,该谓词是目标块链接的谓词的否定。
Predicate<Document> predicate = document => !string.IsNullOrEmpty(document.NewId);
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
predicate);
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions,
document => !predicate(document));
Run Code Online (Sandbox Code Playgroud)
这样,当SplitPipelineActionBlock已满时,项目被提供给空链接,由于谓词失败而被拒绝,项目保留在变换块输出缓冲区中,直到SplitPipelineActionBlock有空间为止。
| 归档时间: |
|
| 查看次数: |
385 次 |
| 最近记录: |