如何在TPL中实现连续运行的数据流块?

use*_*350 6 c# task-parallel-library .net-4.5 tpl-dataflow

我使用BufferBlock和ActionBlock进行生产者/消费者数据流块设置,它在Console应用程序中运行良好;

将所有项目添加到BurfferBlock并将BufferBlock与其他Action项目链接后; 它运作良好.

现在我想使用内部服务,此数据流块管道将始终处于启动状态,并且当消息通过外部事件可用时,它将进入缓冲区块并开始处理.我怎样才能做到这一点?

到目前为止,我在下面做了:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}
Run Code Online (Sandbox Code Playgroud)

Vot*_*fee 2

您的批量大小由batchblock构造函数中的变量“BoundingCapacity”定义。在以下情况下将过帐批次:

  • 已收到等于批量大小的帖子数量(在构造函数中指定)
  • 批处理块被标记为完成
  • 调用triggerbatch方法

您似乎希望在满足浴槽大小或发生超时时发布批次。如果是这种情况,并且批量大小并不重要,我实际上只需向您拥有的计时器添加一个重复间隔,并使批处理块下游的对象忽略空帖子。

您可能真正想要的,也是最符合数据流编程原理的,是在开始发布一系列项目并在完成或发生超时时完成它时创建一个新的批处理块。如果新的批处理块尚不存在,则新帖子将创建一个新的批处理块。

尝试在仅根据第一个触发器触发的批处理块周围实现超时计时器的问题是,您要么需要计算和验证到 bufferblock 的帖子,要么需要观看来自 bufferblock 的帖子。这两种情况都会造成很多丑陋和/或违反块封装。