标签: system.reactive

RX for .NET - 从 Disposable.Create 中调用observer.OnCompleted 时什么也没有发生

我正在使用 C# 中的 .NET 的 RX 库。谁能向我解释为什么“observer.OnCompleted()”方法在以下代码中不执行任何操作:

var observableStream = Observable.Create<CustomMessage>(
       (observer) =>
           {
               CustomMessage cm = new CustomMessage();
               CustomMessage.Subscribe(observer.OnNext);

               return Disposable.Create(
               () =>
                   {
                       Console.WriteLine("Disposing...");
                       CustomMessage.Unsubscribe(observer.OnNext);                          
                       observer.OnCompleted();                //***Nothing happens here***
                   }
               );
       });

    //IObserver.OnException()
    public override void OnException(Exception e)
    {
        Console.WriteLine("Exception occurred - " + e.Message);
    }

    //IObserver.OnComplete()
    public override void OnUnsubscribe()
    {
        Console.WriteLine("Unsubscribed...");            
    }

    //IObserver.OnNext()
    public override void HandleNextMsg(IRVMessage msg)
    {
        Console.WriteLine("Instance received a message");
    }

IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);

//At some later point....
myDisposable.Dispose();
Run Code Online (Sandbox Code Playgroud)

该代码旨在订阅 …

c# system.reactive observable

1
推荐指数
1
解决办法
4589
查看次数

创建后推送新值

阅读 IntroToRx 网站,它不鼓励使用Subject,而是使用 Observable.Create 辅助方法。

正如我所看到的,OnNext 方法只能在 subscribe 方法中调用,因为它是我可以访问 Observer 对象的唯一部分。

如果我想在创建新值后推送新值怎么办?我是否“被迫”使用主题?

c# system.reactive

1
推荐指数
1
解决办法
1047
查看次数

如何使用 Observable.Generate 在控制台中输出

为什么下面的代码没有输出?

它是一个控制台应用程序,用于生成一些随机值(我正在学习反应式扩展)。

using System.Reactive.Linq;

static void Main(string[] args)
{   
    var rand = new Random();
    Observable.Generate(
        5.0,
        i => i > 0,
        i => i + rand.NextDouble() - 0.5,
        i => i,
        i => TimeSpan.FromSeconds(0.1)
    ).Subscribe(Console.WriteLine);  
}
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

1
推荐指数
1
解决办法
630
查看次数

.NET 中的反应式扩展:如何提前取消 Timeout()?

目前我有这样的事情:

await Observable.FromEventPattern<WriteConfirmationEventArgs>(
    h => this.WriteCompleted += h,
    h => this.WriteCompleted -= h)
    .Select(x => x.EventArgs.Id)
    .FirstAsync(x => Guid.Equals(x, dto.Id))
    .Timeout(TimeSpan.FromSeconds(10));  
Run Code Online (Sandbox Code Playgroud)

这似乎工作正常。但是,同一个类中发生了一个事件,我需要取消超时;它不应该等待整整 10 秒。

我该怎么做呢?

.net c# system.reactive

1
推荐指数
1
解决办法
620
查看次数

Observable.Join 示例

有人可以提供一个如何将 Observable.Join 与两种不同的可观察类型一起使用的示例吗?到目前为止,
我在这里找到的最好的解释是,左边的可观察对象打开一个窗口,而右边的可观察对象用于在该窗口中查找匹配项,这是有意义的。但这是如何运作的呢?该示例只是使用,我不明白它的作用。对我来说,持续时间选择器听起来像是一个时间跨度,例如缓冲区窗口。leftDurationSelectorrightDurationSelectorPublish().RefCount()

c# system.reactive

1
推荐指数
1
解决办法
1313
查看次数

如何创建一个“Observable”来观察“FileSystemWatcher”中的“Renamed”和“Changed”事件

我想创建一个Reactive Observable,它可以同时观察Renamed和。Changed

我正在监视文件夹中是否有已修改和重命名的文件FileSystemWatcher。我只想在事件稳定下来后处理该事件(因为有时Changed事件可能会引发多次)。所以我选择Throttle.net的功能Reactive Extension

我可以通过观察单个Changed事件来完成这项工作,但无法弄清楚如何ChangedRenamed单个Observable.

这是我为单个Changed事件所做的事情:

var watcher = new FileSystemWatcher();
watcher.Path = "d:\\test";
var observable = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                            ev => watcher.Changed += ev,
                            ev => watcher.Changed -= ev
);

var d = observable.Throttle(TimeSpan.FromSeconds(5))
                    .Subscribe(
                        a => Console.WriteLine($"Watcher type:{a.EventArgs.ChangeType }, file:{a.EventArgs.Name }, sender: {a.Sender}"),
                        ex => Console.WriteLine("observer error:" + ex.ToString()),
                        () => Console.WriteLine("observer complete.")
                );

Run Code Online (Sandbox Code Playgroud)

但是当我想观看这两个事件时,VS 告诉我 …

filesystemwatcher system.reactive .net-core

1
推荐指数
1
解决办法
211
查看次数

如何处理异常并使用 Rx 重复

我正在使用执行IO操作Observable.FromAsync。我想永远重复这个操作。我不明白的是如何处理异常,对它们做一些事情,然后返回我的循环:

我尝试过的:

IObservable<string> ioObs=Observable.FromAsync<string>([something]);  //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
  onNext:x=> Console.Writeline($"Message:{x}"),
  onCompleted: Console.Writeline("Completed"),
  onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);
Run Code Online (Sandbox Code Playgroud)

现在我不明白为什么我的可观察在第一个异常之后结束。我不告诉它继续吗?

我想做的事:

  • 执行IO操作
  • 如果抛出返回一些自定义值
  • 重复循环

如果我的可观察量是可枚举的,我会想要这种行为:

public IAsyncEnumerable<string> EnumerableBehaviour()
{
   while(true)
   {
      try
      {
        string data=await ReadAsync();  //the `FromAsync` delegate
        yield return data;
      }catch(Exception ex)
          yield return "Error";
      {
   }
}
Run Code Online (Sandbox Code Playgroud)

Repeat即使OnError被触发,我如何继续执行?

应该Observable.Catch和如何Observable.Throw结合Observable.Repeat

c# system.reactive observable

1
推荐指数
1
解决办法
1075
查看次数

C# Rx Observable 产生随机结果

考虑以下程序;

class Program
{
    static IObservable<int> GetNumbers()
    {
        var observable = Observable.Empty<int>();
        foreach (var i in Enumerable.Range(1, 10))
        {
            observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
            {
                Console.WriteLine($"Producing {i}");
                Thread.Sleep(1000);
                return i;
            })));
        }

        return observable;
    }

    static async Task LogNumbers(IObservable<int> observable)
    {
        var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
        await observable;
        subscription.Dispose();
    }

    static void Main(string[] args)
    {
        LogNumbers(GetNumbers()).Wait();
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

它产生以下输出

Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3 …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive observable

1
推荐指数
1
解决办法
192
查看次数

如何订阅 SourceList.Count 并将其转换为 IObservable?

我创建 ValidatableModelBase 类并遇到一些麻烦。我需要订阅 SourceCache 更改并将 Count 集合转换为 IObservable bool 。我该怎么做?

private readonly SourceCache<ValidationResult, string> _results;

public IObservalbe<bool> IsValid { get; }

public ValidatableModelBase()
{
    _results = new SourceCach<ValidationResult, string>(x => x.PropertyName);

    //Doesn't work. I think because i dont .Subscribe() to changes?
    IsValid = _results.Connect().IsEmpty();
}
Run Code Online (Sandbox Code Playgroud)

更新:

HasErrors = collection.CountChanged.Subscribe(x => {Count = x;});
IsValid = this.WhenAnyValie(x => x.HasErrors).Select(x => x == 0);
Run Code Online (Sandbox Code Playgroud)

c# dynamic-data system.reactive reactiveui

1
推荐指数
1
解决办法
871
查看次数

Observable.Do 永远不会触发,即使底层 IObservable 发生变化

我在 ReactiveUI 中获得了一些速成课程,并且System.Reactive.Linq在发现我需要的 UI 库后将其用于所有内容。大多数情况下这似乎很容易理解,但是有一个操作没有做任何事情。

我有一个控件,我需要在两个地方使用它的值。我有一个IObservable<T>代表它的价值,我使用如下:

案例 1:我需要将一个值与另一个 observable 值相结合,从而为另一个 observable 提供一个值。所以我Observable.CombineLatest(myObservable, otherObservable, (m, o) => ProduceValue(m, o))完全按照预期使用此更新生成它 。由此,我知道这myObservable是正确触发更新。

情况 2:我需要在其他地方使用这个值,在不可观察的上下文中。所以:myObservable.Do(v => UpdateViewModelWith(v))这永远不会触发。 我已经通过在 lambda 中放置一个断点并在调试器下运行它来验证这一点。

从案例 1 我知道 observable 正在正确触发。据我了解,observables 在概念上很像 Events,(有一堆机制让它们感觉更像IEnumerables,)并且 Events 完全能够接受多个侦听器,所以事实上它们不应该有两个成为问题。(通过更改设置两个侦听器的顺序进行验证,这不会导致观察到的行为发生变化。)那么什么会导致案例 2 永远不会运行呢?

c# wpf system.reactive reactiveui

1
推荐指数
1
解决办法
71
查看次数