Dor*_*rus 1 c# system.reactive
我正在从Rx向网络写入数据.当然,我Finally在订阅结束时用来关闭我的流.这在干净的OnError()和OnComplete().RX将运行OnNext() ... OnNext(),OnComplete(),Finally()按顺序排列.
但是,有时我想提前终止序列,我这样做是为了使用Dispose().不知怎的Finally(),现在与最后一次OnNext()调用并行运行,导致仍然写入流中的异常OnNext(),以及不完整的写入.
我的订阅看起来大致如下:
NetworkStream stm = client.GetStream();
IDisposable disp = obs
.Finally(() => {
client.Close();
})
.Subscribe(d => {
client.GetStream().Write(d.a, 0, d.a.Lenght);
client.GetStream().Write(d.b, 0, d.b.Lenght);
} () => {
client.GetStream().Write(something(), 0, 1);
});
Thread.sleep(1000);
disp.Dispose();
Run Code Online (Sandbox Code Playgroud)
我也试过了替代方案CancellationToken.
如何正确取消订阅?我不介意它跳过OnComplete(),只要Finally()还在运行.但是,Finally()并行运行存在问题.
我也觉得应该有一个更好的方法来管理资源,通过将声明移动到序列中,这将是一个更好的解决方案.
编辑:以下代码更清楚地显示问题.我希望它总是打印出来,相反,它会经常给出错误,表示在最后一次之前Dispose结束OnNext.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Try finally");
for (int i = 0; i < 10; i++)
{
Finally();
}
Console.WriteLine("Try using");
for (int i = 0; i < 10; i++)
{
Using();
}
Console.WriteLine("Try using2");
for (int i = 0; i < 10; i++)
{
Using2();
}
Console.ReadKey();
}
private static void Using2()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Create<Unit>(obs=>
Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
.Subscribe();
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Using()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
)).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Finally()
{
bool b = true, c = true, d;
IDisposable obDis = Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
_ => DateTime.Now.AddMilliseconds(1)
)
.Finally(() => c = b)
.Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
}
}
Run Code Online (Sandbox Code Playgroud)
Finally很可能不是你想要的.取消订阅时,它不会处置您的资源.相反,它的行为就像finallyC#中的普通块一样,也就是说,无论相应的try-block中的代码是否引发异常,它都将保证执行某些代码.此外,鉴于MSDN上的这个问题,您的代码Finally甚至可能不会在每种情况下执行,因为您的订阅未指定错误处理程序.
你可能想要的是Using:
IDisposable disp = Observable
.Using(
() => Disposable.Create(() => client.Close),
_ => obs)
.Subscribe(....);
Run Code Online (Sandbox Code Playgroud)
Using 只要可观察终止或订阅被取消,就会小心处理资源.
假设client是一个TcpClient,它变得更简单:
IDisposable disp = Observable
.Using(
() => client),
_ => obs)
.Subscribe(....);
Run Code Online (Sandbox Code Playgroud)
我希望调用OnNext不会与关闭客户端重叠,即使提前取消预订,但我还没有测试过.
最后一件事:注意关闭外部变量,如stm示例中所示.总是和当地人一起工作更安全.完全重写,因为我会尝试它是这样的:
IDisposable disp = Observable.Using(
() => client,
_ => Observable.Using(
() => client.GetStream(),
stream => Observable.Create<Unit>(observer => obs
.Subscribe(
d => {
stream.Write(d.a, 0, d.a.Lenght);
stream.Write(d.b, 0, d.b.Lenght);
},
() => {
stream.Write(something(), 0, 1);
}))))
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
279 次 |
| 最近记录: |