the*_*Man 5 .net c# system.reactive tpl-dataflow rx.net
我有一连串的TPL Dataflow块,并希望观察系统内部的进度。
我知道我可以将a TransformBlock塞入要观察的网格中,将其发布到各种进度更新器中,然后将消息原封不动地返回到下一个块。我不喜欢这种解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑。
因此,我想知道是否可以ISourceBlock<T>.AsObservable用来观察网格内消息的传递而无需更改它,也无需消耗消息。如果可行的话,这似乎是一种更纯净,更实用的解决方案。
从我对Rx的(有限的)理解中,这意味着我需要可观察的对象是热的而不是冷的,以便我的progress更新程序可以看到该消息但不使用它。并且.Publish().RefCount()似乎是使可观察到的热点的方法。但是,它根本不起作用按预期-而不是要么block2或progress接收并消耗每条消息。
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});
// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
Run Code Online (Sandbox Code Playgroud)
结果是不确定的,但是我得到的结果有点像这样:
block2:21
progress:22
progress:24
block2:23
progress:25
Run Code Online (Sandbox Code Playgroud)
那么,我是在做错什么,还是由于TPL Dataflow AsObservable的实现方式而使这不可能?
我知道我还可以替换LinkTo之间block1,并block2与可观察/观察员对和可能的工作,但LinkTo与下游BoundedCapacity = 1是整个原因,我首先使用TPL数据流。
编辑: 一些澄清:
BoundedCapacity=1在block2中设置。尽管在这个琐碎的示例中没有必要,但是在下游约束的情况下,我发现TPL Dataflow确实有用。为了阐明我在第二段中拒绝的解决方案,将添加以下链接在block1和block2之间的块:
var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});
我还想保持背压,以便如果上游的某个区块正在将工作分配给block1其他同等工人,则block1如果该链已经很忙,它将不会向其发送工作。
您的代码的问题在于您正在连接两个block1. 然后,数据流只是为最先到达的消费者提供价值。
因此,您需要将这些值广播到block1另外两个块中,然后才能独立使用它们。
顺便说一句,不要做,.Publish().RefCount()因为它没有按照你的想法去做。它将有效地使一次运行只能被观察,在一次运行期间将允许多个观察者连接并看到相同的值。它与数据源或数据流块如何交互无关。
试试这个代码:
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);
// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
Run Code Online (Sandbox Code Playgroud)
这给了我:
区块2:21 区块2:22 区块2:23 区块2:24 区块2:25 进度:21 进度:22 进度:23 进度:24 进度:25
我想这就是你想要的。
现在,顺便说一句,使用 Rx 可能是一个更好的选择。它比任何 TPL 或数据流选项都更强大且更具声明性。
你的代码可以归结为:
Observable
.Range(1, 5)
.Select(i => i + 20)
.Do(i => Debug.Print("progress:" + i.ToString()));
.Subscribe(i => Debug.Print("block2:" + i.ToString()));
Run Code Online (Sandbox Code Playgroud)
这几乎给你同样的结果。
尝试更换:
obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));
Run Code Online (Sandbox Code Playgroud)
和:
obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));
Run Code Online (Sandbox Code Playgroud)
我想该ForEachAsync方法没有正确连接/它正在触发,但是异步部分发生了一些奇怪的事情。
| 归档时间: |
|
| 查看次数: |
808 次 |
| 最近记录: |