我尝试编写控制台可观察的内容,如下例所示,但它不起作用。订阅存在一些问题。如何解决这些问题?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new TaskCompletionSource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
如果我使用Observable.Interval,它会订阅两次,并且一个输入有两个输出。如果我使用任何版本的FromConsole,我都会有一个订阅和一个阻塞的线程。