小编use*_*350的帖子

当数据流比订阅者可以消耗的速度快时,Rx如何​​表现?

我很高兴在生产应用中使用Rx; 我将收听来自不同频道的传入通知更新.

我将在此流的顶部编写Rx查询,我将使用.Window()运算符进行限制.订阅者(在我的例子中是ActionBlock)将以阻塞方式处理此数据; (即它不会从ActionBlock中生成任务).请记住,如果数据的速度比我的订阅者可以消耗的速度快得多,那么传入数据会发生什么.Rx查询是否在内部使用任何缓冲区; 它会溢出吗?

c# system.reactive

10
推荐指数
2
解决办法
3075
查看次数

如何在TPL中实现连续运行的数据流块?

我使用BufferBlock和ActionBlock进行生产者/消费者数据流块设置,它在Console应用程序中运行良好;

将所有项目添加到BurfferBlock并将BufferBlock与其他Action项目链接后; 它运作良好.

现在我想使用内部服务,此数据流块管道将始终处于启动状态,并且当消息通过外部事件可用时,它将进入缓冲区块并开始处理.我怎样才能做到这一点?

到目前为止,我在下面做了:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite); …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library .net-4.5 tpl-dataflow

6
推荐指数
1
解决办法
4214
查看次数