如何从控制台输入创建 IObservable<string>

dgz*_*rgo 1 c# console system.reactive observable

我尝试编写控制台可观察的内容,如下例所示,但它不起作用。订阅存在一些问题。如何解决这些问题?

static class Program
{
    static async Task Main(string[] args)
    {
        // var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
        // var observable = FromConsole().Publish().RefCount(); // doesn't work
        var observable = FromConsole(); // doesn't work
        observable.Subscribe(Console.WriteLine);
        await Task.Delay(1500);
        observable.Subscribe(Console.WriteLine);
        await new TaskCompletionSource().Task;
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

如果我使用Observable.Interval,它会订阅两次,并且一个输入有两个输出。如果我使用任何版本的FromConsole,我都会有一个订阅和一个阻塞的线程。

Eni*_*ity 6

首先,通常最好避免使用它Observable.Create来创建可观察量 - 它当然是为了这个目的,但它可能会创建由于其阻塞性质而不像您想象的那样表现的可观察量。正如您所发现的!

相反,如果可能,请使用内置运算符来创建可观察量。在这种情况下可以做到这一点。

我的版本FromConsole是这样的:

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
Run Code Online (Sandbox Code Playgroud)

Observable.Start实际上就像Task.Run可观察量一样。它Console.ReadLine()无阻碍地呼唤着我们。

/对重复Observable.Defer调用。如果没有它,它只会永远重复调用并返回一个字符串。RepeatObservable.Start(() => Console.ReadLine())DeferObservable.Start

这样就解决了。

现在,第二个问题是您希望查看Console.ReadLine()两个可FromConsole()观察对象订阅的输出值。

由于这种方式的Console.ReadLine工作原理,您可以从每个订阅中获取值,但一次只能获取一个值。试试这个代码:

static async Task Main(string[] args)
{
    var observable = FromConsole();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        
Run Code Online (Sandbox Code Playgroud)

当我运行它时,我得到这样的输出:

1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf
Run Code Online (Sandbox Code Playgroud)

原因是每个订阅都会启动一个新的订阅FromConsole。因此,您有两个对Console.ReadLine()它们的调用有效地排队,并且每个调用仅获得每个备用输入。1因此&之间交替2

因此,要解决这个问题,您只需要.Publish().RefCount()操作符对即可。

尝试这个:

static async Task Main(string[] args)
{
    var observable = FromConsole().Publish().RefCount();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        
Run Code Online (Sandbox Code Playgroud)

我现在得到:

1:Hello
2:Hello
1:World
2:World
Run Code Online (Sandbox Code Playgroud)

简而言之,它是非阻塞可FromConsole观察值的组合以及它的使用,.Publish().RefCount()使得它按照您期望的方式工作。