Reactive Extensions同步订阅

Luk*_*kie 4 .net reactive-programming system.reactive

有人可以帮我对IObserver进行同步订阅,这样调用方法就会阻塞,直到订阅完成.例如:

出版者

public static class Publisher {
public static IObservable<string> NonBlocking()
    {
        return Observable.Create<string>(
            observable =>
            {
                Task.Run(() =>
                {
                    observable.OnNext("a");
                    Thread.Sleep(1000);
                    observable.OnNext("b");
                    Thread.Sleep(1000);
                    observable.OnCompleted();
                    Thread.Sleep(1000);
                });

                return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
            });
    }
Run Code Online (Sandbox Code Playgroud)

}

订户

public static class Subscriber{
public static bool Subscribe()
    {
        Publisher.NonBlocking().Subscribe((s) =>
        {
            Debug.WriteLine(s);
        }, () =>
        {
            Debug.WriteLine("Complete");
        });
        // This will currently return true before the subscription is complete
        // I want to block and not Return until the Subscriber is Complete
        return true;
    }
Run Code Online (Sandbox Code Playgroud)

}

cwh*_*ris 6

你需要使用System.Reactive.Threading.Task这个:

把你的观察变成一个任务......

var source = Publisher.NonBlocking()
    .Do(
        (s) => Debug.WriteLines(x),
        ()  => Debug.WriteLine("Completed")
    )
    .LastOrDefault()
    .ToTask();
Run Code Online (Sandbox Code Playgroud)

Do(...).Subscribe()就像Subscribe(...).所以Do只是添加了一些副作用.

LastOrDefault在那里因为Task创建的ToTask将只等待来自源的第一个项目Observable,如果没有产生任何项目,它将失败(抛出).因此,无论它产生什么,都会LastOrDefault有效地导致Task等待直到源完成.

所以在我们完成任务后,请等待它:

task.Wait(); // blocking
Run Code Online (Sandbox Code Playgroud)

或者使用async/await:

await task; // non-blocking
Run Code Online (Sandbox Code Playgroud)

编辑:

科里·尼尔森提出了一个很好的观点:

在最新版本的C#和Visual Studio中,您实际上可以await使用IObservable<T>.这是一个非常酷的功能,但它的工作方式与等待的方式略有不同Task.

等待任务时,它会导致任务运行.如果多次等待任务的单个实例,则该任务将仅执行一次.可观察量略有不同.您可以将observable视为具有多个返回值的异步函数...每次订阅observable时,observable/function都会执行.因此这两段代码有不同的含义:

等待观察者:

// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe
Run Code Online (Sandbox Code Playgroud)

通过任务等待Observable:

// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.
Run Code Online (Sandbox Code Playgroud)

所以,实质上,等待Observable的工作如下:

// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe
Run Code Online (Sandbox Code Playgroud)

但是,await observable语法在Xamerin Studio中不起作用(截至本文撰写时).如果您使用的是Xamerin Studio,我强烈建议您ToTask在最后一刻使用,以模拟Visual Studio await observable语法的行为.

  • 你可以直接`等待'`IObservable <>`.它将返回序列中的最后一项. (2认同)