标签: system.reactive

使用Rx Repeat()和Replay()缓存并重新启动DNS查询

我是一个Rx新手,所以我希望你能忍受我.作为我自己的练习,也可能是我可以为同事演示的样本,我为Dns.BeginGetHostEntry()/ EndGetHostEntry()完成了两个包装类:DnsResolver和DnsResolverRx.

每个类都有一个公共静态方法:

void Resolve(string host, Action<IPHostEntry> getResult, Control context = null);
Run Code Online (Sandbox Code Playgroud)

...以及使其变得有趣的一些其他要求:1.如果提供了上下文,则必须在关联的线程2上调用getResult.对于MaxResultAge秒,将缓存同一主机的先前结果.

非Rx版本工作正常,但与此问题并不真正相关.Rx版本如下所示:

class DnsResolverRx
{
  static Func<string, IObservable<IPHostEntry>> _resolver = Observable.FromAsyncPattern<string, IPHostEntry>(Dns.BeginGetHostEntry, Dns.EndGetHostEntry);

  public static void Resolve(string host, Action<IPHostEntry> setResult, Control context = null)
  {
    IObservable<IPHostEntry> result;
    result = _cache.GetOrCreateValue( // a trivial TryGetValue wrapper
      host,
      () => _resolver(host)
        .Do(e => Debug.WriteLine("resolved"))
        .Repeat()
        .Do(e => Debug.WriteLine("repeated"))
        .Replay(MaxResultAge)
        .RefCount()
    );

    result = result.Take(1); // each request needs only 1 result

    if (context != null)
      result = result.ObserveOn(context);

    result.Subscribe(
      entry …
Run Code Online (Sandbox Code Playgroud)

system.reactive

3
推荐指数
1
解决办法
1500
查看次数

Rx:运算符,用于从Observable流中获取第一个和最近的值

对于基于Rx的变更跟踪解决方案,我需要一个可以在可观察序列中获取第一个和最新项目的运算符.

我如何编写一个生成以下大理石图的Rx运算符(注意:括号仅用于排列项目...我不确定如何最好地在文本中表示这一点):

     xs:---[a  ]---[b  ]-----[c  ]-----[d  ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------| 
Run Code Online (Sandbox Code Playgroud)

system.reactive

3
推荐指数
1
解决办法
3415
查看次数

ReactiveExtensions Observable FromAsync调用两次Function

好的,试着去理解Rx,有点迷失在这里.

FromAsyncPattern现在已被弃用,所以我从这里拿到了示例(使用Rx轻松完成任务),它可以工作,我只是做了一些更改,而不是等待等待观察和订阅.....

我不明白的是为什么称为SumSquareRoots函数的两倍?

 var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();


class Program
{
    static void Main(string[] args)
    {
        Samples();
    }

    static void Samples()
    {
        var x = 100000000;

        try
        {
            var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();
        }
        catch (TimeoutException)
        {
            Console.WriteLine("Timed out :-(");
        }
    }

    static Task<double> SumSquareRoots(long count, CancellationToken ct)
    {
        return Task.Run(() =>
        {
            var res = 0.0;
            Console.WriteLine("Why I'm called twice");
            for (long i …
Run Code Online (Sandbox Code Playgroud)

c# task system.reactive

3
推荐指数
1
解决办法
3344
查看次数

如何使用CancellationToken支持将IObservable <T>转换为RX中的列表

我有一个方法返回IObservable<long>我从async方法调用的方法.我想将此转换为正常List<long>,但如果我CancellationToken发出信号,则取消该操作.

我想做这样的事情:

List<long> result = await Foo().ToList(myCancellationToken);
Run Code Online (Sandbox Code Playgroud)

完成此任务的正确(和最简单)方法是什么?返回an 的ToList()扩展方法并不带参数.IObservable<T>IObservable<List<T>>CancellationToken

.net system.reactive .net-4.5 cancellation-token

3
推荐指数
1
解决办法
1689
查看次数

如何在ReactiveUI中使用WhenAny(...)合并多个observable?

我有一个问题,这是对本网站提出的以下问题的扩展.

当返回类型不重要时,是否有更优雅的方法来合并observable?

我有一个IObservable<Unit>(比方说X),一个反应集合(Y)和一个属性(Z).返回类型并不重要.我只想在任何这些改变时订阅.

我知道如何观察所有3并Subscribe使用Observable.Merge如下.

Observable.Merge(X, Y.Changed, ObservableForProperty(Z).Select(_ => Unit.Default)).Subscribe(..)
Run Code Online (Sandbox Code Playgroud)

它有效.

但是,当我尝试使用WhenAny(...,....,....).Subscribe()时,我的X更改时不会触发订阅.做上述使用WhenAny(...)而不是Observable.Merge(..)?? 的语法是什么?

我喜欢使用,WhenAny(....)因为我ReactiveUI在其他地方使用.

示例:假设我有一个派生自ReactiveObject以下属性的类.

public class AnotherVM : ReactiveObject
{
    public bool IsTrue
    {
        get { return this.isTrue; }
        set { this.RaiseAndSetIfChanged(x => x.isTrue, ref this.isTrue, value); }
    }

    public IObservable<Unit> Data
    {
        get { return this.data; }
    }

    public ReactiveCollection MyCol
    {
       get …
Run Code Online (Sandbox Code Playgroud)

system.reactive observable reactiveui

3
推荐指数
1
解决办法
4084
查看次数

CompositeDisposable-确定性顺序?

此问题引用名称空间中的CompositeDisposableSystem.Reactive.Disposables

对其成员进行CompositeDisposable呼叫的顺序是否Dispose确定?

以下实验产生了list ['a', 'b'],但是我在文档中看不到保证一定顺序的任何东西。

var result = new List<char>();
var disposable = new CompositeDisposable(
    Disposable.Create(() => result.Add('a')),
    Disposable.Create(() => result.Add('b')));
disposable.Dispose();
Run Code Online (Sandbox Code Playgroud)

.net c# system.reactive

3
推荐指数
1
解决办法
1952
查看次数

从IObservable <T>到Task

所以情况就是这样.假设某个地方我正在填写一个集合.每次添加元素时,IObservable都会为其订阅者调用OnNext.

现在,将有一个点将填充集合.(我正在读一些东西,我读完了......无论如何).此时,在订阅者上调用OnComplete().

但是,用户不会遵守此方法.他宁愿打电话给他等待的异步方法......他并不关心他读的东西,他只关心他读完了.

基本上,我想从IObservable中获取一个Task,它在IObservable调用OnComplete()时返回给它的订阅者.我特别希望用户不要使用观察者,但只是要知道在他的等待调用之后发生的任何事情都会在收集完成后发生.

也许ToTask()方法可以解决问题?我无法通过文档说出来.

reactive-programming task-parallel-library system.reactive observable

3
推荐指数
1
解决办法
221
查看次数

如何通过响应式扩展将一个事件拆分为多个事件?

如何在响应式扩展流中处理单个事件并将其拆分为同一个流中的多个事件?

我有一个序列来检索json数据,这是一个顶层的数组.在json数据被解码的那一点上,我想获取该数组中的每个元素并继续沿着流传递这些元素.

这是一个我希望存在的虚构函数的例子(但名字较短!).它是在Python中,但我认为它很简单,它应该对其他Rx程序员清晰.

# Smallest possible example
from rx import Observable
import requests
stream = Observable.just('https://api.github.com/users')
stream.map(requests.get) \
      .map(lambda raw: raw.json()) \
      .SPLIT_ARRAY_INTO_SEPARATE_EVENTS() \
      .subscribe(print)
Run Code Online (Sandbox Code Playgroud)

换句话说,我想像这样进行转换:

From:
# --[a,b,c]--[d,e]--|->
To:
# --a-b-c-----d-e---|->
Run Code Online (Sandbox Code Playgroud)

arrays split stream reactive-programming system.reactive

3
推荐指数
1
解决办法
231
查看次数

Subject <T>和ReplaySubject <T>之间的主要区别是什么?

System.Reactive.Subjects.Subject<T>System.Reactive.Subjects.ReplaySubject<T>课程有什么区别?

一个不是从另一个派生的,但它们具有相同的描述并在MSDN中实现相同的接口.

c# system.reactive

3
推荐指数
1
解决办法
396
查看次数

Reactive的"缓冲直到安静"行为?

我的问题有点像Nagle算法创建的问题,但不完全正确.我想要的是将OnNext通知缓冲IObservable<T>到一系列IObservable<IList<T>>s中,如下所示:

  1. 第一个T通知到达时,将其添加到缓冲区并开始倒计时
  2. 如果T在倒计时到期之前收到另一个通知,请将其添加到缓冲区并重新开始倒计时
  3. 一旦倒计时结束(即生产者已经沉默了一段时间),将所有缓冲的T通知转发为单个聚合IList<T>通知.
  4. 如果缓冲区大小在倒计时到期之前超过某个最大值,则无论如何都要发送它.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是"在第一个通知到达时启动计时器,当其他通知到达时重启它"我想要的行为,并且它还发送一个空列表在如果没有从下面生成通知,则每个时间窗口的结束.

希望放弃任何的T通知; 只是缓冲它们.

有这样的事情存在,还是我需要自己编写?

c# reactive-programming system.reactive

3
推荐指数
2
解决办法
784
查看次数