shi*_*ipr 2 c# system.reactive observer-pattern c#-5.0
经过对StackOverflow的大量努力和研究 - 其中大部分已经过时,因为Reactive Extensions代码最近发生了变化 - 我终于能够消除此Observable方法从套接字读取数据的所有编译错误,我理解这一点代码比我最初做的要好得多.但还不完全.有人可以用英语回复给我,回答两三个问题吗?
是从这种方法中提取的缓冲数据(或者如果我有错误,应该怎么做?)?是否有不再需要它的部分?虽然我真的喜欢与业务代码分离,并且只用一两种方法保留所有套接字代码,但有没有更好的方法(解耦和可读)?
public static IObservable<int> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
{
Contract.Requires(byteCount > 0);
return Observable.Create<int>(
observer =>
{
byte[] buffer = new byte[byteCount];
int remainder = byteCount;
bool shutdown = false;
return Observable.Defer<int>(() =>
Task.Factory.FromAsync<int>(socket.BeginReceive(buffer, buffer.Length - remainder, remainder, flags,
(result) =>
{
var read = (int)result.AsyncState;
remainder -= read;
if (read == 0)
shutdown = true;
},
null), socket.EndReceive).ToObservable())
.TakeWhile(_ => remainder > 0 && !shutdown)
.TakeLast(1)
.Subscribe(
observer.OnNext,
ex =>
{
var socketError = ex as SocketException;
if (socketError != null
&& (socketError.SocketErrorCode == SocketError.Disconnecting
|| socketError.SocketErrorCode == SocketError.Shutdown))
{
observer.OnCompleted();
}
else { observer.OnError(ex); }
},
observer.OnCompleted);
});
}
}
Run Code Online (Sandbox Code Playgroud)
调用它的函数仍然有我不理解的编译错误(.Do和.BitConverter有一些无效的参数):
static IObservable<string> StartClient(this IObserver<ScanInformation> observer, IPAddress ip, int port)
{
var client = Observable.Using(
() => new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp),
socket =>
from _ in socket.WhenConnected(ip, port)
from message in
(from first in socket.WhenDataReceived(4)
let length = BitConverter.ToInt32(first, 0)
from message in
Observable.If(
condition: () => length > 0,
thenSource: from second in socket.WhenDataReceived(length)
select Encoding.UTF8.GetString(second, 0, length),
elseSource: Observable.Return<string>(null))
select message)
.Repeat()
.TakeWhile(message => message != null)
select message);
return
client.Do(observer).TakeLast(1);
}
Run Code Online (Sandbox Code Playgroud)
编译错误都是由于传递错误的类型参数造成的.
你在Do()上的编译错误是因为你的观察者是一个IObserver<ScanInformation>但客户端是一个IObservable<string>.你的意思是将字符串转换为实例ScanInformation吗?
你在BitConverter上的编译接受一个byte[]作为第一个参数(要转换的字节缓冲区),但是你传递了一个int假定你从WhenDataReceived返回缓冲区的某个点; 现在你传回读取的字节数.
Rx没有太大变化,以至于这种代码会破坏.您的代码看起来可能已经遭受了一些复制/粘贴错误 - 这可能会让您感到困惑而不是有用.有一个看看这个博客帖子的套接字读取使用RX在一个相当简单的方式来包装TPL呼叫的合理期待实现.这个讨论也可能具有启发性.
ObservableSocketRxx库中还有一个相当不错的选择.看到这里.