Eri*_*öök 1 c# task system.reactive async-await .net-core
我是 rx 的新手,并且一直在使用 dot net 中的反应式扩展来处理一些网络代码。我的问题是,当我通过我提供的令牌触发取消时,我使用异步函数创建的 tcpClients 的 observable 没有像我预期的那样完成。这是我遇到问题的代码的简化版本:
public static class ListenerExtensions
{
    public static IObservable<TcpClient> ToListenerObservable(
        this IPEndPoint endpoint,
        int backlog)
    {
        return new TcpListener(endpoint).ToListenerObservable(backlog);
    }
    public static IObservable<TcpClient> ToListenerObservable(
        this TcpListener listener,
        int backlog)
    {
        return Observable.Create<TcpClient>(async (observer, token) => 
        {
            listener.Start(backlog);
            try
            {
                while (!token.IsCancellationRequested)
                    observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
                //This never prints and onCompleted is never called.
                Console.WriteLine("Completing..");
                observer.OnCompleted();
            }
            catch (System.Exception error)
            {
                observer.OnError(error);   
            }
            finally
            {
                //This is never executed and my progam exits without closing the listener.
                Console.WriteLine("Stopping listener...");
                listener.Stop();
            }
        });
    }
}
class Program
{
   static void Main(string[] args)
    {
        var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
        var cancellation = new CancellationTokenSource();
        home.ToListenerObservable(10)
            .Subscribe(
                onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
                onError: e => Console.WriteLine($"Error: {e.Message}"),
                onCompleted: () => Console.WriteLine("Complete"), // Never happens
                token: cancellation.Token);
        Console.WriteLine("Any key to cancel");
        Console.ReadKey();
        cancellation.Cancel();
        Thread.Sleep(1000);
    }
}
如果我运行它并连接到 localhost:2323,我可以看到我得到了一系列已连接的 tcpClients。但是,如果我触发取消取消令牌的取消程序退出而不关闭侦听器并像我期望的那样发出 onCompleted 事件。我究竟做错了什么?
尽量避免编写太多try/catch代码并使用取消令牌总是好的。这是一种在不脱离标准 Rx 操作符的情况下做你正在做的事情的方法。请不要我无法完全测试此代码,因此可能仍需要进行一些调整。
尝试这个:
var query = Observable.Create<TcpClient>(o =>
{
    var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
    var listener = new TcpListener(home);
    listener.Start();
    return
        Observable
            .Defer(() => Observable.FromAsync(() => listener.AcceptTcpClientAsync()))
            .Repeat()
            .Subscribe(o);
});
var completer = new Subject<Unit>();
var subscription =
    query
        .TakeUntil(completer)
        .Subscribe(
            onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
            onError: e => Console.WriteLine($"Error: {e.Message}"),
            onCompleted: () => Console.WriteLine("Complete"));
Console.WriteLine("Enter to cancel");
Console.ReadLine();
completer.OnNext(Unit.Default);
Thread.Sleep(1000);
这里的关键是completer像取消令牌一样询问。它自然地完成订阅,不像subscription.Dispose()没有OnCompleted调用就完成它。
| 归档时间: | 
 | 
| 查看次数: | 1001 次 | 
| 最近记录: |