相关疑难解决方法(0)

可观察订阅如何优雅地终止?

我正在尝试使用 Reactive Extensions (Rx) 来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我使用了CancellationToken,它有效地停止了订阅。

当请求取消时,如何优雅地完成当前工作并正确终止而不会丢失任何数据?

例子

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
Run Code Online (Sandbox Code Playgroud)

输出

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive cancellation

5
推荐指数
1
解决办法
1332
查看次数

取消使用异步函数创建的可观察对象

我是 rx 的新手,并且一直在使用 dot net 中的反应式扩展来处理一些网络代码。我的问题是,当我通过我提供的令牌触发取消时,我使用异步函数创建的 tcpClients 的 observable 没有像我预期的那样完成。这是我遇到问题的代码的简化版本:

public static class ListenerExtensions
{
    public static IObservable<TcpClient> ToListenerObservable(
        this IPEndPoint endpoint,
        int backlog)
    {
        return new TcpListener(endpoint).ToListenerObservable(backlog);
    }

    public static IObservable<TcpClient> ToListenerObservable(
        this TcpListener listener,
        int backlog)
    {
        return Observable.Create<TcpClient>(async (observer, token) => 
        {
            listener.Start(backlog);

            try
            {
                while (!token.IsCancellationRequested)
                    observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
                //This never prints and onCompleted is never called.
                Console.WriteLine("Completing..");
                observer.OnCompleted();
            }
            catch (System.Exception error)
            {
                observer.OnError(error);   
            }
            finally
            {
                //This is never executed and my …
Run Code Online (Sandbox Code Playgroud)

c# task system.reactive async-await .net-core

1
推荐指数
1
解决办法
1001
查看次数