TPL DataFlow-按持续时间或阈值进行批处理

Ash*_*tia 5 batching task-parallel-library tpl-dataflow

我已经使用TPL数据流实现了producer..consumer模式。用例是代码从Kafka总线读取消息。为了提高效率,我们需要在访问数据库时分批处理消息。

TPL数据流中是否有方法可以保留消息并在达到大小或持续时间阈值时触发?

例如,当前实现将消息从队列中拉出后就将其发布。

    postedSuccessfully = targetBuffer.Post(msg.Value);
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 6

通过System.Reactive尤其是Buffer操作符,已经可以通过计数和持续时间进行缓冲。缓冲区收集传入事件,直到达到所需的计数或其时间跨度到期。

数据流块旨在与 System.Reactive 一起使用。可以使用DataflowBlock.AsObservable()AsObserver()扩展方法转换为 Observables 和 Observers 。

这使得构建缓冲块非常容易:

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}
Run Code Online (Sandbox Code Playgroud)

此方法使用两个缓冲块来缓冲输入和输出。 Buffer()当批处理已满或时间跨度到期时,从输入块(observable)读取并写入输出块(观察者)。

默认情况下,Rx 工作在当前线程上。通过调用ObserveOn(TaskPoolScheduler.Default)我们告诉它在任务池线程上处理数据。

例子

此代码为 5 个项目或 1 秒创建一个缓冲区块。它首先发布 7 个项目,等待 1.1 秒然后发布另外 7 个项目。每个批次都与线程 ID 一起写入控制台:

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
Run Code Online (Sandbox Code Playgroud)

输出是:

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
Run Code Online (Sandbox Code Playgroud)

  • 它已经可用了。您可以通过“BoundedCapacity”选项指定任何块上的绑定,并使用“await block.SendAsync”来发布到它。如果块已满,`SendAsync` 将异步等待。在同样的情况下,“Post”将返回“false”。如果设置 `BoundedCapacity=1`,该方法仅在处理前一个缓冲区时才会发布新缓冲区 (2认同)

JSt*_*ard 4

虽然没有开箱即用的超时,但TriggerBatch只要下游管道等待批处理的时间足够长,您就可以连接一个计时器。然后当批次流过时重置计时器。他们BatchBlock将为您处理剩下的事情。

例如,此示例已配置为每次都会产生 1 的批处理大小,即使批处理块通常会等待 10 个元素。超时强制清空当前存储在BatchBlock

public class BatchBlockExample
{
    [Test]
    public async Task BatchBlockWithTimeOut()
    {
        var batchBlock = new BatchBlock<int>(10);

        var timeOut = TimeSpan.FromSeconds(1);
        var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
        timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();            

        var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
        {
            //Reset the timeout since we got a batch
            timeOutTimer.Stop();
            timeOutTimer.Start();
            Console.WriteLine($"Batch Size: {x.Count()}");
        });

        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        timeOutTimer.Start();

        foreach(var item in Enumerable.Range(0, 5))
        {
            await Task.Delay(2000);
            await batchBlock.SendAsync(item);
        }

        batchBlock.Complete();
        await actionBlock.Completion;
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Run Code Online (Sandbox Code Playgroud)