可观察到Rx中的回调

avo*_*avo 10 c# callback system.reactive observable

我正在寻找一种优雅的方法来创建Observable一个简单的回调委托与Rx,类似的东西Observable.FromEventPattern

说,我正在包装EnumWindows回调EnumWindowsProc我提供的Win32 API .

我知道我可以为这个回调创建一个临时的C#事件适配器并传递它FromEventPattern.此外,我可能IObservable手动实现,所以它将IObserver.OnNext从我的EnumWindowsProc回调调用.

在Rx中包含回调的现有模式是否缺少?

Dir*_*irk 8

你可以使用一个Subject<T>可以用来从命令式编程世界转移到Rx的功能世界.

Subject<T>同时实现了IObservable<T>IObserver<T>,这样你就可以调用它OnNext,OnError并且OnCompleted方法和用户将会收到通知.

如果你想公开Subject<T>作为一个属性,那么你应该这样做,.AsObservable()因为这隐藏了事实上,IObservable<T>实际上是一个Subject<T>.它使事情变得如此((Subject<string>) obj.Event).OnNext("Foo")不可能.


Bra*_*don 5

请记住,像 中使用的回调EnumWindows与 Rx 略有不同。具体来说,回调可以通过其返回值与调用者进行通信。Rx 观察者无法做到这一点。此外,回调可以接收多个参数,但 Rx 观察者接收单个值。因此,您需要将多个参数包装到单个对象中。

考虑到这一点,使用 a 的替代方法Subject是使用Observable.Create。这样,您仅在实际存在观察者时才注册回调,如果该观察者取消订阅,则取消注册它。

对于您使用的同步 API 示例,您可能会执行类似的操作。请注意,在这个示例中,实际上没有办法在中途取消注册回调,因为在我们返回取消订阅一次性内容之前,这一切都是同步发生的。

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.Create<Foo>(observer =>
    {
        FooApi.enumerate(arg1, arg2, e =>
        {
            observer.OnNext(new Foo(e));
            return true;
        });

        // In your case, FooApi.enumerate is actually synchronous
        // so when we get to this line of code, we know
        // the stream is complete.
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item
Run Code Online (Sandbox Code Playgroud)

我们可以通过引入一点异步性来解决无法提前停止的问题,这将使观察者有时间获得可以处理的一次性物品来通知您。我们可以使用CreateAsync它来获取CancellationToken当观察者取消订阅时将取消的。我们可以运行FooApi里面的代码Task.Run

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.CreateAsync<Foo>(async (observer, ct) =>
    {
        await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
        {
            observer.OnNext(e);

            // Returning false will stop the enumeration
            return !ct.IsCancellationRequested;
        }));
        observer.OnCompleted();
    });
}
Run Code Online (Sandbox Code Playgroud)

在更传统的异步回调 API 中,您在某个点注册并在其他点取消注册,您可能会遇到类似这样的情况:

public static IObservable<Foo> WrapFooApi(string args)
{
    return Observable.Create<Foo>(observer =>
    {
        FooToken token = default(FooToken);
        var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
        token = FooApi.Register(args, e =>
        {
            observer.OnNext(new Foo(e));
        });

        return unsubscribe;
    });
}
Run Code Online (Sandbox Code Playgroud)