avo*_*avo 10 c# callback system.reactive observable
我正在寻找一种优雅的方法来创建Observable一个简单的回调委托与Rx,类似的东西Observable.FromEventPattern?
说,我正在包装EnumWindows回调EnumWindowsProc我提供的Win32 API .
我知道我可以为这个回调创建一个临时的C#事件适配器并传递它FromEventPattern.此外,我可能IObservable手动实现,所以它将IObserver.OnNext从我的EnumWindowsProc回调调用.
在Rx中包含回调的现有模式是否缺少?
你可以使用一个Subject<T>可以用来从命令式编程世界转移到Rx的功能世界.
Subject<T>同时实现了IObservable<T>和IObserver<T>,这样你就可以调用它OnNext,OnError并且OnCompleted方法和用户将会收到通知.
如果你想公开Subject<T>作为一个属性,那么你应该这样做,.AsObservable()因为这隐藏了事实上,IObservable<T>实际上是一个Subject<T>.它使事情变得如此((Subject<string>) obj.Event).OnNext("Foo")不可能.
请记住,像 中使用的回调EnumWindows与 Rx 略有不同。具体来说,回调可以通过其返回值与调用者进行通信。Rx 观察者无法做到这一点。此外,回调可以接收多个参数,但 Rx 观察者接收单个值。因此,您需要将多个参数包装到单个对象中。
考虑到这一点,使用 a 的替代方法Subject是使用Observable.Create。这样,您仅在实际存在观察者时才注册回调,如果该观察者取消订阅,则取消注册它。
对于您使用的同步 API 示例,您可能会执行类似的操作。请注意,在这个示例中,实际上没有办法在中途取消注册回调,因为在我们返回取消订阅一次性内容之前,这一切都是同步发生的。
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.Create<Foo>(observer =>
{
FooApi.enumerate(arg1, arg2, e =>
{
observer.OnNext(new Foo(e));
return true;
});
// In your case, FooApi.enumerate is actually synchronous
// so when we get to this line of code, we know
// the stream is complete.
observer.OnCompleted();
return Disposable.Empty;
});
}
// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item
Run Code Online (Sandbox Code Playgroud)
我们可以通过引入一点异步性来解决无法提前停止的问题,这将使观察者有时间获得可以处理的一次性物品来通知您。我们可以使用CreateAsync它来获取CancellationToken当观察者取消订阅时将取消的。我们可以运行FooApi里面的代码Task.Run:
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.CreateAsync<Foo>(async (observer, ct) =>
{
await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
{
observer.OnNext(e);
// Returning false will stop the enumeration
return !ct.IsCancellationRequested;
}));
observer.OnCompleted();
});
}
Run Code Online (Sandbox Code Playgroud)
在更传统的异步回调 API 中,您在某个点注册并在其他点取消注册,您可能会遇到类似这样的情况:
public static IObservable<Foo> WrapFooApi(string args)
{
return Observable.Create<Foo>(observer =>
{
FooToken token = default(FooToken);
var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
token = FooApi.Register(args, e =>
{
observer.OnNext(new Foo(e));
});
return unsubscribe;
});
}
Run Code Online (Sandbox Code Playgroud)