我回顾了以下SO问题: 什么是热和冷可观测量?
总结一下:
然而,我觉得热和冷仍然是混乱的根源.所以这是我的问题:
默认情况下所有rx可观察量是否都是冷的(主题除外)?
我经常读到事件是热观察的典型隐喻,但我也读到这Rx.fromEvent(input, 'click')是一个冷可观察的(?).
是否有什么/哪些Rx运算符将冷观测值转换为热观测值(除了publish和之外share)?
例如,它如何与Rx运算符一起使用withLatestFrom?让我们cold$成为一个冷酷的观察者.会sth$.withLatestFrom(cold$,...)是一个热门观察?
或者,如果我不sth1$.withLatestFrom(cold$,...), sth2$.withLatestFrom(cold$,...)和订阅sth1和sth2,将我总是看到两个相同的值sth?
我认为Rx.fromEvent会产生冷的可观测量,但事实并非如此,正如其中一个答案所述.但是,我仍然对此行为感到困惑:codepen.io/anon/pen/NqQMJR?editors=101.不同的订阅从同一个observable获得不同的值.click事件不是共享的吗?
有人可以解释Observable和ConnectableObservable之间的区别吗?Rx Extensions文档非常稀疏,我不明白ConnectableObservable在什么情况下是有用的.
此类用于Replay/Prune方法.
我理解冷热观察的区别,但我总是看到人们使用热的观察而不是冷; 事实上,如果有人意外地使用冷观察,那么它被认为是一个错误,因为它通常是不良行为的原因.
在热门的情况下,您更喜欢或使用冷观察的情况是什么?
我很高兴在生产应用中使用Rx; 我将收听来自不同频道的传入通知更新.
我将在此流的顶部编写Rx查询,我将使用.Window()运算符进行限制.订阅者(在我的例子中是ActionBlock)将以阻塞方式处理此数据; (即它不会从ActionBlock中生成任务).请记住,如果数据的速度比我的订阅者可以消耗的速度快得多,那么传入数据会发生什么.Rx查询是否在内部使用任何缓冲区; 它会溢出吗?
更新:查看底部的示例
我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给OnNext. 可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的任务。这是一个实现。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject, 1.ToString());
new Subscriber(subject, 2.ToString());
new Subscriber(subject, 3.ToString());
//Run the loop for 3 seconds
await …Run Code Online (Sandbox Code Playgroud) c# publish-subscribe reactive-programming system.reactive reactiveui
我知道热和冷可观察量之间的差异之前已经在C#的上下文中讨论了Stack Overflow,但我根本不了解C#并且不理解Lee Campbell所指的代码示例.
我在Scala工作,使用RXScala库.什么是Scala中的冷热可观测量,以及它们如何使用RXScala实现?
有没有办法解开的IObservable<Task<T>>进入IObservable<T>保持同样的事件顺序,这样的吗?
Tasks: ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->
Run Code Online (Sandbox Code Playgroud)
假设我有一个消耗流消息的桌面应用程序,其中一些需要大量的后处理:
IObservable<Message> streamOfMessages = ...;
IObservable<Task<Result>> streamOfTasks = streamOfMessages
.Select(async msg => await PostprocessAsync(msg));
IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks
Run Code Online (Sandbox Code Playgroud)
我想象有两种处理方式.
首先,我可以订阅streamOfTasks使用异步事件处理程序:
streamOfTasks.Subscribe(async task =>
{
var result = await task;
Display(result);
});
Run Code Online (Sandbox Code Playgroud)
其次,我可以转换streamOfTasks使用Observable.Create,像这样:
var streamOfResults =
from task in streamOfTasks
from value in Observable.Create<T>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
// TODO: don't know when to call obs.OnComplete()
}) …Run Code Online (Sandbox Code Playgroud) c# ×3
javascript ×2
rxjs ×2
.net ×1
.net-4.5 ×1
angular ×1
observable ×1
reactiveui ×1
rx.net ×1
rxjs5 ×1
scala ×1