使用AsObservable观察TPL数据流块而不消耗消息

the*_*Man 5 .net c# system.reactive tpl-dataflow rx.net

我有一连串的TPL Dataflow块,并希望观察系统内部的进度。

我知道我可以将a TransformBlock塞入要观察的网格中,将其发布到各种进度更新器中,然后将消息原封不动地返回到下一个块。我不喜欢这种解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑。

因此,我想知道是否可以ISourceBlock<T>.AsObservable用来观察网格内消息的传递而无需更改它,也无需消耗消息。如果可行的话,这似乎是一种更纯净,更实用的解决方案。

从我对Rx的(有限的)理解中,这意味着我需要可观察的对象是热的而不是冷的,以便我的progress更新程序可以看到该消息但不使用它。并且.Publish().RefCount()似乎是使可观察到的热点的方法。但是,它根本不起作用按预期-而不是要么block2progress接收并消耗每条消息。

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();
Run Code Online (Sandbox Code Playgroud)

结果是不确定的,但是我得到的结果有点像这样:

block2:21
progress:22
progress:24
block2:23
progress:25
Run Code Online (Sandbox Code Playgroud)

那么,我是在做错什么,还是由于TPL Dataflow AsObservable的实现方式而使这不可能?

我知道我还可以替换LinkTo之间block1,并block2与可观察/观察员对和可能的工作,但LinkTo与下游BoundedCapacity = 1是整个原因,我首先使用TPL数据流。

编辑: 一些澄清:

  • 我确实打算BoundedCapacity=1在block2中设置。尽管在这个琐碎的示例中没有必要,但是在下游约束的情况下,我发现TPL Dataflow确实有用。
  • 为了阐明我在第二段中拒绝的解决方案,将添加以下链接在block1和block2之间的块:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • 我还想保持背压,以便如果上游的某个区块正在将工作分配给block1其他同等工人,则block1如果该链已经很忙,它将不会向其发送工作。

Eni*_*ity 5

您的代码的问题在于您正在连接两个block1. 然后,数据流只是为最先到达的消费者提供价值。

因此,您需要将这些值广播到block1另外两个块中,然后才能独立使用它们。

顺便说一句,不要做,.Publish().RefCount()因为它没有按照你的想法去做。它将有效地使一次运行只能被观察,在一次运行期间将允许多个观察者连接并看到相同的值。它与数据源或数据流块如何交互无关。

试试这个代码:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();
Run Code Online (Sandbox Code Playgroud)

这给了我:

区块2:21
区块2:22
区块2:23
区块2:24
区块2:25
进度:21
进度:22
进度:23
进度:24
进度:25

我想这就是你想要的。

现在,顺便说一句,使用 Rx 可能是一个更好的选择。它比任何 TPL 或数据流选项都更强大且更具声明性。

你的代码可以归结为:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));
Run Code Online (Sandbox Code Playgroud)

这几乎给你同样的结果。


Cli*_*int 0

尝试更换:

obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));
Run Code Online (Sandbox Code Playgroud)

和:

obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));
Run Code Online (Sandbox Code Playgroud)

我想该ForEachAsync方法没有正确连接/它正在触发,但是异步部分发生了一些奇怪的事情。