Nat*_*tan 4 console events readline system.reactive
我正在学习RX,并希望使用Console.ReadLine作为可观察序列的来源.
我知道我可以使用"yield return"创建"IEnumerable",但对于我的具体用例,我决定创建一个C#事件,以便潜在的许多观察者能够共享相同的键盘输入.
这是我的代码:
class Program
{
private delegate void OnNewInputLineHandler(string line);
private static event OnNewInputLineHandler OnNewInputLineEvent = _ => {};
static void Main(string[] args)
{
Task.Run((Action) GetInput);
var input = ConsoleInput();
input.Subscribe(s=>Console.WriteLine("1: " + s));
Thread.Sleep(30000);
}
private static void GetInput()
{
while (true)
OnNewInputLineEvent(Console.ReadLine());
}
private static IObservable<string> ConsoleInput()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
OnNewInputLineHandler h = observer.OnNext;
OnNewInputLineEvent += h;
return Disposable.Create(() => { OnNewInputLineEvent -= h; });
});
}
}
Run Code Online (Sandbox Code Playgroud)
我的问题 - 当我运行如上所示的GetInput方法时,第一个输入行不会被发送到序列(但它被发送到事件处理程序).
但是,如果我用以下版本替换它,一切都按预期工作:
private static void GetInput()
{
while (true)
{
var s = Console.ReadLine();
OnNewInputLineEvent(s);
}
}
Run Code Online (Sandbox Code Playgroud)
有人可以解释为什么会发生这种情况吗?
你想让自己的生活变得困难.几乎总有一种方法可以使用Rx简化操作.这只是学习在功能上思考而不是在程序上思考的问题.
这就是你所需要的:
class Program
{
static void Main(string[] args)
{
var subscription = ConsoleInput().Subscribe(s => Console.WriteLine("1: " + s));
Thread.Sleep(30000);
subscription.Dispose();
}
private static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
Run Code Online (Sandbox Code Playgroud)
这允许多个订阅者通过共享一个输入.Publish().RefCount()
.而.SubscribeOn(Scheduler.Default)
推动认购出一个新的线程-没有它,你块上的订阅.
归档时间: |
|
查看次数: |
1220 次 |
最近记录: |