相关疑难解决方法(0)

冷热观察:有"热"和"冷"运营商吗?

我回顾了以下SO问题: 什么是热和冷可观测量?

总结一下:

  • 当一个cold observable有一个观察者使用它时,它会发出它的值,即观察者接收的值序列与订阅时间无关.所有观察者都将使用相同的值序列.
  • 热的observable独立于其订阅发出值,即观察者接收的值是订阅时间的函数.

然而,我觉得热和冷仍然是混乱的根源.所以这是我的问题:

  • 默认情况下所有rx可观察量是否都是冷的(主题除外)?

    我经常读到事件是热观察的典型隐喻,但我也读到这Rx.fromEvent(input, 'click')是一个冷可观察的(?).

  • 是否有什么/哪些Rx运算符将冷观测值转换为热观测值(除了publish和之外share)?

    例如,它如何与Rx运算符一起使用withLatestFrom?让我们cold$成为一个冷酷的观察者.会sth$.withLatestFrom(cold$,...)是一个热门观察?

    或者,如果我不sth1$.withLatestFrom(cold$,...), sth2$.withLatestFrom(cold$,...)和订阅sth1sth2,将我总是看到两个相同的值sth

  • 我认为Rx.fromEvent会产生冷的可观测量,但事实并非如此,正如其中一个答案所述.但是,我仍然对此行为感到困惑:codepen.io/anon/pen/NqQMJR?editors=101.不同的订阅从同一个observable获得不同的值.click事件不是共享的吗?

javascript reactive-programming rxjs rxjs5 angular

60
推荐指数
2
解决办法
1万
查看次数

Rx中的IConnectableObservables

有人可以解释Observable和ConnectableObservable之间的区别吗?Rx Extensions文档非常稀疏,我不明白ConnectableObservable在什么情况下是有用的.

此类用于Replay/Prune方法.

.net reactive-programming system.reactive

38
推荐指数
1
解决办法
3098
查看次数

冷观察的用例是什么?

我理解冷热观察的区别,但我总是看到人们使用热的观察而不是冷; 事实上,如果有人意外地使用冷观察,那么它被认为是一个错误,因为它通常是不良行为的原因.

在热门的情况下,您更喜欢或使用冷观察的情况是什么?

javascript observable rxjs

16
推荐指数
2
解决办法
495
查看次数

当数据流比订阅者可以消耗的速度快时,Rx如何​​表现?

我很高兴在生产应用中使用Rx; 我将收听来自不同频道的传入通知更新.

我将在此流的顶部编写Rx查询,我将使用.Window()运算符进行限制.订阅者(在我的例子中是ActionBlock)将以阻塞方式处理此数据; (即它不会从ActionBlock中生成任务).请记住,如果数据的速度比我的订阅者可以消耗的速度快得多,那么传入数据会发生什么.Rx查询是否在内部使用任何缓冲区; 它会溢出吗?

c# system.reactive

10
推荐指数
2
解决办法
3075
查看次数

如何分离 IObservable 和 IObserver

更新:查看底部的示例

我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给OnNext. 可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的任务。这是一个实现。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            var subject = new Subject<string>();

            //Create a class and inject the subject as IObserver
            new Publisher(subject);

            //Create a class and inject the subject as IObservable
            new Subscriber(subject, 1.ToString());
            new Subscriber(subject, 2.ToString());
            new Subscriber(subject, 3.ToString());

            //Run the loop for 3 seconds
            await …
Run Code Online (Sandbox Code Playgroud)

c# publish-subscribe reactive-programming system.reactive reactiveui

6
推荐指数
1
解决办法
327
查看次数

RXScala中的热和冷可观测量有什么区别?

我知道热和冷可观察量之间的差异之前已经在C#的上下文中讨论了Stack Overflow,但我根本不了解C#并且不理解Lee Campbell所指的代码示例.

我在Scala工作,使用RXScala库.什么是Scala中的冷热可观测量,以及它们如何使用RXScala实现?

scala system.reactive

5
推荐指数
1
解决办法
458
查看次数

将IObservable <Task <T >>解包到IObservable <T>中

有没有办法解开的IObservable<Task<T>>进入IObservable<T>保持同样的事件顺序,这样的吗?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->
Run Code Online (Sandbox Code Playgroud)

假设我有一个消耗流消息的桌面应用程序,其中一些需要大量的后处理:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks
Run Code Online (Sandbox Code Playgroud)

我想象有两种处理方式.

首先,我可以订阅streamOfTasks使用异步事件处理程序:

streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});
Run Code Online (Sandbox Code Playgroud)

其次,我可以转换streamOfTasks使用Observable.Create,像这样:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    }) …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library system.reactive .net-4.5 rx.net

5
推荐指数
1
解决办法
342
查看次数