当您创建具有有界容量的批处理块并调用triggerBatch时(并行)发布新项目 - 在触发批处理执行时间期间发布新项目将失败.
在输入数据流暂停或减速的情况下,调用触发批处理(每X次)以确保数据在块中不会延迟太长时间.
以下代码将输出一些"post failure"事件.例如:
public static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
batchBlock.LinkTo(actionBlock);
var producerTask = Task.Factory.StartNew(() =>
{
//Post 10K Items
for (int i = 0; i < 10000; i++)
{
var postResult = batchBlock.Post(i);
if (!postResult)
Console.WriteLine("Failed to Post");
}
});
var triggerBatchTask = Task.Factory.StartNew(() =>
{
//Trigger Batch..
for (int i …Run Code Online (Sandbox Code Playgroud) 我使用BufferBlock和ActionBlock进行生产者/消费者数据流块设置,它在Console应用程序中运行良好;
将所有项目添加到BurfferBlock并将BufferBlock与其他Action项目链接后; 它运作良好.
现在我想使用内部服务,此数据流块管道将始终处于启动状态,并且当消息通过外部事件可用时,它将进入缓冲区块并开始处理.我怎样才能做到这一点?
到目前为止,我在下面做了:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite); …Run Code Online (Sandbox Code Playgroud) 我已经使用TPL数据流实现了producer..consumer模式。用例是代码从Kafka总线读取消息。为了提高效率,我们需要在访问数据库时分批处理消息。
TPL数据流中是否有方法可以保留消息并在达到大小或持续时间阈值时触发?
例如,当前实现将消息从队列中拉出后就将其发布。
postedSuccessfully = targetBuffer.Post(msg.Value);
Run Code Online (Sandbox Code Playgroud) 我向单个数据库同步提交一系列select语句(查询 - 数千个),并DataTable为每个查询返回一个语句(注意:该程序仅在运行时了解其正在扫描的数据库模式,因此指某东西的用途DataTables)。该程序在客户端计算机上运行并连接到远程计算机上的数据库。运行这么多查询需要很长时间。因此,假设异步或并行执行它们会加快速度,我正在探索TPL Dataflow (TDF)。我想使用该TDF库,因为它似乎可以处理与编写多线程代码相关的所有问题,否则需要手动完成。
显示的代码基于http://blog.i3arnon.com/2016/05/23/tpl-dataflow/。它很小,只是帮助我理解 的基本操作TDF。请知道我已经阅读了很多博客并编写了很多迭代代码来尝试解决这个问题。
尽管如此,在当前的迭代中,我有一个问题:
问题
代码位于一个button click方法内部(用户使用 UI 选择一台机器、一个 SQL 实例和一个数据库,然后开始扫描)。带有运算符的两行在await构建时返回错误:The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'。我无法更改按钮单击方法的返回类型。我是否需要以某种方式将button click方法与async-await代码隔离?
问题
尽管我找到了描述 的基础知识的漂亮文章TDF,但我找不到如何获取每次调用所产生的输出TransformBlock(即 a DataTable)的示例。虽然我想提交查询async,但我确实需要阻塞,直到提交的所有查询都TransformBlock完成。在所有查询完成之前,如何获取and 块 …