我从 Observable.FromEventPattern 得到一个 IObservable,如下所示:
SomeObject target = new SomeObject();
string eventName = "SomeEvent";
IObservable<T> obs = Observable.FromEventPattern<T>(target, eventName);
Run Code Online (Sandbox Code Playgroud)
据我了解,FromEventPattern 调用将自动为我生成添加/删除事件处理程序。但是处理程序何时真正被添加/删除?
我假设在订阅 IObservable 时添加了处理程序。处理订阅者时,处理程序是否也会自动取消注册?
Visual Studio 2012 有 nuget 版本 2.8.60318.667,与 Reactive.Extensions 3.1.1 不兼容。
如果将 System.Reactive.* 包手动复制到我的解决方案的包目录中,当我从 Visual Studio UI 启动包管理器并浏览已安装的包时:而不是 habing 已安装包的列表,将显示以下错误:
“System.Reactive.Core”已经为“System.Reactive.Interfaces”定义了一个依赖项。错误
如果我使用 de package manager 命令安装包: install-package System.Reactive 会显示以下错误:“System.Reactive 3.1.1”包需要 NuGet 客户端版本“2.8.1”或更高版本,但当前的 NuGet 版本是'2.8.60318.667'。
我想知道是否有任何方法可以在 .NET 的 Reactive Extensions 中实现 Distinct,使其在给定的时间内工作,并且在此之后它应该重置并允许再次返回值。我需要这个应用程序中的热源,该应用程序将全年工作,现在停止,所以我担心性能,我需要在一段时间后允许这些值。还有 DistinctUntilChanged 但在我的情况下,值可以混合 - 例如:AAXA,DistinctUntilChanged 会给我 AXA,我需要结果 AX,并且在给定时间后应该重置 distinct。
我想找到关于热可观察和 IDisposable 对象作为事件类型的最佳实践。
假设我的代码将 Bitmap 对象生成为热可观察对象,并且我有多个订阅者。例如:
public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return Directory.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.Select(x => new Bitmap(x))
.Publish()
.RefCount();
}
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
}
Run Code Online (Sandbox Code Playgroud)
所以问题是:处理作为热 observable 来源的一次性资源的最佳实践是什么?
在这个例子中,我想在使用后处理图像,但我不知道 - 究竟是什么时候?
订阅事件的调用顺序并不明显,因此我无法处理“最后一个”事件。
提前致谢。
我是 rx 的新手,并且一直在使用 dot net 中的反应式扩展来处理一些网络代码。我的问题是,当我通过我提供的令牌触发取消时,我使用异步函数创建的 tcpClients 的 observable 没有像我预期的那样完成。这是我遇到问题的代码的简化版本:
public static class ListenerExtensions
{
public static IObservable<TcpClient> ToListenerObservable(
this IPEndPoint endpoint,
int backlog)
{
return new TcpListener(endpoint).ToListenerObservable(backlog);
}
public static IObservable<TcpClient> ToListenerObservable(
this TcpListener listener,
int backlog)
{
return Observable.Create<TcpClient>(async (observer, token) =>
{
listener.Start(backlog);
try
{
while (!token.IsCancellationRequested)
observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
//This never prints and onCompleted is never called.
Console.WriteLine("Completing..");
observer.OnCompleted();
}
catch (System.Exception error)
{
observer.OnError(error);
}
finally
{
//This is never executed and my …Run Code Online (Sandbox Code Playgroud) 目前我在Observable.Create()中使用await.ExecuteRequestAsync是一个调用HttpClient.GetAsync方法(String)的包装类
public IObservable<IList<ExampleResponseModel>> ListExamplesRx()
{
return Observable.Create<IList<ExampleResponseModel>>(
o =>
{
try
{
var url = string.Format(Routes.Examples.List);
IList<ExampleResponseModel> exampleResponse = await ExecuteRequestAsync<IList<ExampleResponseModel>>(url, HttpMethod.Get);
o.OnNext(exampleResponse);
o.OnCompleted();
}
catch (Exception e)
{
o.OnError(e);
}
return Disposable.Empty;
}
);
}
Run Code Online (Sandbox Code Playgroud)
这是最好的做法吗?有没有更合适的rx解决方案?
今天我终于偶然发现了一个非常简单的 RX 问题的解决方案:假设你有一个 Observable,它返回项目列表。喜欢
Observable<List<String>>。您经常会从 Web API 收到类似这样的响应。
但是,您可能希望对单个项目进行操作,在本例中是字符串。
flatMapIterable来救援!这个方便的运算符通过映射函数Iterables将一个流扁平化为从这些单个项目生成的流Iterables。
如果这很重要,我正在用 .NET Core 编写。
实施例在RxJava:转换Observable<List<Car>>到的序列Observable<Car>中RxJava
我开始研究 Reactive Extensions 以及如何将它们应用于常见场景以启用更易于管理和可读的代码。
我现在正在研究基本概念,并构建了一个简单的类:
public class ValidatableObject<TValue>
{
public bool IsValid { get; private set; } = true;
public TValue Value { get; }
public ICollection<IValidationRule<TValue>> Rules { get; }
public ValidatableObject(TValue value)
{
Value = value;
Rules = new List<IValidationRule<TValue>>();
Rules.ToObservable()
.All(rule => rule.Check(Value))
.Subscribe(b => IsValid = b);
}
}
public interface IValidationRule<T>
{
bool Check(T value);
}
public class FailingValidationRule<T> : IValidationRule<T>
{
public bool Check(T value) => false;
}
public static void main()
{
var …Run Code Online (Sandbox Code Playgroud) 所以我有这个代码:
ISubject<int> _processed = new ReplaySubject<int>();
_processed.Buffer(5000).Subscribe(UpdateProcessed);
// Start some process which calls _processed.OnNext
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是有时缓冲区没有填满,因为最后一批小于 5000 并且进程在没有调用UpdateProcessed执行的情况下退出。
_processed处理完成后有没有办法刷新observable中剩余的项目?
我有这个代码片段:
static void Main(string[] args)
{
Observable.Range(1, 5).Subscribe(async x => await DoTheThing(x));
Console.WriteLine("done");
}
static async Task DoTheThing(int x)
{
await Task.Delay(TimeSpan.FromSeconds(x));
Console.WriteLine(x);
}
Run Code Online (Sandbox Code Playgroud)
我希望它会循环 5 次,每次循环后都会有一行打印为
1
2
3
4
5
Run Code Online (Sandbox Code Playgroud)
但令人惊讶的是,这将打印“完成”并立即终止。似乎 async+await 没有等待 Task.Delay 并退出。
语义似乎没有问题,那么我在 Subscribe 或 async 哪里出错了,如何修复它以满足我从 Rx 调用异步任务的请求?
谢谢。
c# ×10
system.reactive ×10
.net ×2
async-await ×2
.net-core ×1
asynchronous ×1
distinct ×1
rx.net ×1
subscribe ×1
task ×1
timeout ×1