我正在使用 C# 中的 .NET 的 RX 库。谁能向我解释为什么“observer.OnCompleted()”方法在以下代码中不执行任何操作:
var observableStream = Observable.Create<CustomMessage>(
(observer) =>
{
CustomMessage cm = new CustomMessage();
CustomMessage.Subscribe(observer.OnNext);
return Disposable.Create(
() =>
{
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
observer.OnCompleted(); //***Nothing happens here***
}
);
});
//IObserver.OnException()
public override void OnException(Exception e)
{
Console.WriteLine("Exception occurred - " + e.Message);
}
//IObserver.OnComplete()
public override void OnUnsubscribe()
{
Console.WriteLine("Unsubscribed...");
}
//IObserver.OnNext()
public override void HandleNextMsg(IRVMessage msg)
{
Console.WriteLine("Instance received a message");
}
IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);
//At some later point....
myDisposable.Dispose();
Run Code Online (Sandbox Code Playgroud)
该代码旨在订阅 …
阅读 IntroToRx 网站,它不鼓励使用Subject,而是使用 Observable.Create 辅助方法。
正如我所看到的,OnNext 方法只能在 subscribe 方法中调用,因为它是我可以访问 Observer 对象的唯一部分。
如果我想在创建新值后推送新值怎么办?我是否“被迫”使用主题?
为什么下面的代码没有输出?
它是一个控制台应用程序,用于生成一些随机值(我正在学习反应式扩展)。
using System.Reactive.Linq;
static void Main(string[] args)
{
var rand = new Random();
Observable.Generate(
5.0,
i => i > 0,
i => i + rand.NextDouble() - 0.5,
i => i,
i => TimeSpan.FromSeconds(0.1)
).Subscribe(Console.WriteLine);
}
Run Code Online (Sandbox Code Playgroud) 目前我有这样的事情:
await Observable.FromEventPattern<WriteConfirmationEventArgs>(
h => this.WriteCompleted += h,
h => this.WriteCompleted -= h)
.Select(x => x.EventArgs.Id)
.FirstAsync(x => Guid.Equals(x, dto.Id))
.Timeout(TimeSpan.FromSeconds(10));
Run Code Online (Sandbox Code Playgroud)
这似乎工作正常。但是,同一个类中发生了一个事件,我需要取消超时;它不应该等待整整 10 秒。
我该怎么做呢?
有人可以提供一个如何将 Observable.Join 与两种不同的可观察类型一起使用的示例吗?到目前为止,
我在这里找到的最好的解释是,左边的可观察对象打开一个窗口,而右边的可观察对象用于在该窗口中查找匹配项,这是有意义的。但这是如何运作的呢?该示例只是使用,我不明白它的作用。对我来说,持续时间选择器听起来像是一个时间跨度,例如缓冲区窗口。leftDurationSelectorrightDurationSelectorPublish().RefCount()
我想创建一个Reactive Observable,它可以同时观察Renamed和。Changed
我正在监视文件夹中是否有已修改和重命名的文件FileSystemWatcher。我只想在事件稳定下来后处理该事件(因为有时Changed事件可能会引发多次)。所以我选择Throttle.net的功能Reactive Extension。
我可以通过观察单个Changed事件来完成这项工作,但无法弄清楚如何Changed在Renamed单个Observable.
这是我为单个Changed事件所做的事情:
var watcher = new FileSystemWatcher();
watcher.Path = "d:\\test";
var observable = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
ev => watcher.Changed += ev,
ev => watcher.Changed -= ev
);
var d = observable.Throttle(TimeSpan.FromSeconds(5))
.Subscribe(
a => Console.WriteLine($"Watcher type:{a.EventArgs.ChangeType }, file:{a.EventArgs.Name }, sender: {a.Sender}"),
ex => Console.WriteLine("observer error:" + ex.ToString()),
() => Console.WriteLine("observer complete.")
);
Run Code Online (Sandbox Code Playgroud)
但是当我想观看这两个事件时,VS 告诉我 …
我正在使用执行IO操作Observable.FromAsync。我想永远重复这个操作。我不明白的是如何处理异常,对它们做一些事情,然后返回我的循环:
我尝试过的:
IObservable<string> ioObs=Observable.FromAsync<string>([something]); //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
onNext:x=> Console.Writeline($"Message:{x}"),
onCompleted: Console.Writeline("Completed"),
onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);
Run Code Online (Sandbox Code Playgroud)
现在我不明白为什么我的可观察在第一个异常之后结束。我不告诉它继续吗?
我想做的事:
如果我的可观察量是可枚举的,我会想要这种行为:
public IAsyncEnumerable<string> EnumerableBehaviour()
{
while(true)
{
try
{
string data=await ReadAsync(); //the `FromAsync` delegate
yield return data;
}catch(Exception ex)
yield return "Error";
{
}
}
Run Code Online (Sandbox Code Playgroud)
Repeat即使OnError被触发,我如何继续执行?
应该Observable.Catch和如何Observable.Throw结合Observable.Repeat?
考虑以下程序;
class Program
{
static IObservable<int> GetNumbers()
{
var observable = Observable.Empty<int>();
foreach (var i in Enumerable.Range(1, 10))
{
observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
{
Console.WriteLine($"Producing {i}");
Thread.Sleep(1000);
return i;
})));
}
return observable;
}
static async Task LogNumbers(IObservable<int> observable)
{
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
await observable;
subscription.Dispose();
}
static void Main(string[] args)
{
LogNumbers(GetNumbers()).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
它产生以下输出
Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3 …Run Code Online (Sandbox Code Playgroud) 我创建 ValidatableModelBase 类并遇到一些麻烦。我需要订阅 SourceCache 更改并将 Count 集合转换为 IObservable bool 。我该怎么做?
private readonly SourceCache<ValidationResult, string> _results;
public IObservalbe<bool> IsValid { get; }
public ValidatableModelBase()
{
_results = new SourceCach<ValidationResult, string>(x => x.PropertyName);
//Doesn't work. I think because i dont .Subscribe() to changes?
IsValid = _results.Connect().IsEmpty();
}
Run Code Online (Sandbox Code Playgroud)
更新:
HasErrors = collection.CountChanged.Subscribe(x => {Count = x;});
IsValid = this.WhenAnyValie(x => x.HasErrors).Select(x => x == 0);
Run Code Online (Sandbox Code Playgroud) 我在 ReactiveUI 中获得了一些速成课程,并且System.Reactive.Linq在发现我需要的 UI 库后将其用于所有内容。大多数情况下这似乎很容易理解,但是有一个操作没有做任何事情。
我有一个控件,我需要在两个地方使用它的值。我有一个IObservable<T>代表它的价值,我使用如下:
案例 1:我需要将一个值与另一个 observable 值相结合,从而为另一个 observable 提供一个值。所以我Observable.CombineLatest(myObservable, otherObservable, (m, o) => ProduceValue(m, o))完全按照预期使用此更新生成它
。由此,我知道这myObservable是正确触发更新。
情况 2:我需要在其他地方使用这个值,在不可观察的上下文中。所以:myObservable.Do(v => UpdateViewModelWith(v))。 这永远不会触发。 我已经通过在 lambda 中放置一个断点并在调试器下运行它来验证这一点。
从案例 1 我知道 observable 正在正确触发。据我了解,observables 在概念上很像 Events,(有一堆机制让它们感觉更像IEnumerables,)并且 Events 完全能够接受多个侦听器,所以事实上它们不应该有两个成为问题。(通过更改设置两个侦听器的顺序进行验证,这不会导致观察到的行为发生变化。)那么什么会导致案例 2 永远不会运行呢?
system.reactive ×10
c# ×9
observable ×3
reactiveui ×2
.net ×1
.net-core ×1
dynamic-data ×1
wpf ×1