我很高兴在生产应用中使用Rx; 我将收听来自不同频道的传入通知更新.
我将在此流的顶部编写Rx查询,我将使用.Window()运算符进行限制.订阅者(在我的例子中是ActionBlock)将以阻塞方式处理此数据; (即它不会从ActionBlock中生成任务).请记住,如果数据的速度比我的订阅者可以消耗的速度快得多,那么传入数据会发生什么.Rx查询是否在内部使用任何缓冲区; 它会溢出吗?
我使用BufferBlock和ActionBlock进行生产者/消费者数据流块设置,它在Console应用程序中运行良好;
将所有项目添加到BurfferBlock并将BufferBlock与其他Action项目链接后; 它运作良好.
现在我想使用内部服务,此数据流块管道将始终处于启动状态,并且当消息通过外部事件可用时,它将进入缓冲区块并开始处理.我怎样才能做到这一点?
到目前为止,我在下面做了:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite); …Run Code Online (Sandbox Code Playgroud)