我最近在C#中编写了一个快速而肮脏的概念验证代理服务器,作为使Java Web应用程序与驻留在另一台服务器上的传统VB6应用程序进行通信的努力的一部分.这简直太荒谬了:
代理服务器和客户端都使用相同的消息格式; 在代码中我使用一个ProxyMessage类来表示来自客户端的请求和服务器生成的响应:
public class ProxyMessage
{
int Length; // message length (not including the length bytes themselves)
string Body; // an XML string containing a request/response
// writes this message instance in the proper network format to stream
// (helper for response messages)
WriteToStream(Stream stream) { ... }
}
Run Code Online (Sandbox Code Playgroud)
消息尽可能简单:正文的长度+消息正文.
我有一个单独的ProxyClient类,表示与客户端的连接.它处理代理和单个客户端之间的所有交互.
我想知道的是它们是简化与异步套接字编程相关的样板代码的设计模式还是最佳实践?例如,您需要注意管理读取缓冲区,以免意外丢失字节,并且需要跟踪处理当前消息的距离.在我当前的代码中,我在我的回调函数中完成所有这些工作TcpClient.BeginRead,并在一些实例变量的帮助下管理缓冲区的状态和当前的消息处理状态.
我传递给我的回调函数的代码BeginRead如下,以及上下文的相关实例变量.代码似乎工作正常"原样",但我想知道它是否可以重构一点以使其更清晰(或者它可能已经是?).
private enum BufferStates
{
GetMessageLength,
GetMessageBody
}
// The read buffer. Initially 4 bytes because we are initially
// waiting to receive the message length (a 32-bit int) from the client
// on first connecting. By constraining the buffer length to exactly 4 bytes,
// we make the buffer management a bit simpler, because
// we don't have to worry about cases where the buffer might contain
// the message length plus a few bytes of the message body.
// Additional bytes will simply be buffered by the OS until we request them.
byte[] _buffer = new byte[4];
// A count of how many bytes read so far in a particular BufferState.
int _totalBytesRead = 0;
// The state of the our buffer processing. Initially, we want
// to read in the message length, as it's the first thing
// a client will send
BufferStates _bufferState = BufferStates.GetMessageLength;
// ...ADDITIONAL CODE OMITTED FOR BREVITY...
// This is called every time we receive data from
// the client.
private void ReadCallback(IAsyncResult ar)
{
try
{
int bytesRead = _tcpClient.GetStream().EndRead(ar);
if (bytesRead == 0)
{
// No more data/socket was closed.
this.Dispose();
return;
}
// The state passed to BeginRead is used to hold a ProxyMessage
// instance that we use to build to up the message
// as it arrives.
ProxyMessage message = (ProxyMessage)ar.AsyncState;
if(message == null)
message = new ProxyMessage();
switch (_bufferState)
{
case BufferStates.GetMessageLength:
_totalBytesRead += bytesRead;
// if we have the message length (a 32-bit int)
// read it in from the buffer, grow the buffer
// to fit the incoming message, and change
// state so that the next read will start appending
// bytes to the message body
if (_totalBytesRead == 4)
{
int length = BitConverter.ToInt32(_buffer, 0);
message.Length = length;
_totalBytesRead = 0;
_buffer = new byte[message.Length];
_bufferState = BufferStates.GetMessageBody;
}
break;
case BufferStates.GetMessageBody:
string bodySegment = Encoding.ASCII.GetString(_buffer, _totalBytesRead, bytesRead);
_totalBytesRead += bytesRead;
message.Body += bodySegment;
if (_totalBytesRead >= message.Length)
{
// Got a complete message.
// Notify anyone interested.
// Pass a response ProxyMessage object to
// with the event so that receivers of OnReceiveMessage
// can send a response back to the client after processing
// the request.
ProxyMessage response = new ProxyMessage();
OnReceiveMessage(this, new ProxyMessageEventArgs(message, response));
// Send the response to the client
response.WriteToStream(_tcpClient.GetStream());
// Re-initialize our state so that we're
// ready to receive additional requests...
message = new ProxyMessage();
_totalBytesRead = 0;
_buffer = new byte[4]; //message length is 32-bit int (4 bytes)
_bufferState = BufferStates.GetMessageLength;
}
break;
}
// Wait for more data...
_tcpClient.GetStream().BeginRead(_buffer, 0, _buffer.Length, this.ReadCallback, message);
}
catch
{
// do nothing
}
}
Run Code Online (Sandbox Code Playgroud)
到目前为止,我唯一真正的想法是将缓冲区相关的东西提取到一个单独的MessageBuffer类中,只需让我的读回调在它们到达时向它添加新的字节.在MessageBuffer随后会担心这样的事情目前BufferState,当它接收完整的消息,该触发事件ProxyClient可以然后进一步传播到主代理服务器代码,其中请求可以被处理.
我必须克服类似的问题。这是我的解决方案(经过修改以适合您自己的示例)。
我们创建一个包装器Stream( 的超类NetworkStream,它是 的超类TcpClient或其他)。它监视读取。当读取某些数据时,它会被缓冲。当我们收到长度指示符(4 字节)时,我们检查是否有完整的消息(4 字节 + 消息正文长度)。当我们这样做时,我们会引发一个MessageReceived带有消息正文的事件,并从缓冲区中删除该消息。该技术自动处理碎片消息和每包多消息的情况。
public class MessageStream : IMessageStream, IDisposable
{
public MessageStream(Stream stream)
{
if(stream == null)
throw new ArgumentNullException("stream", "Stream must not be null");
if(!stream.CanWrite || !stream.CanRead)
throw new ArgumentException("Stream must be readable and writable", "stream");
this.Stream = stream;
this.readBuffer = new byte[512];
messageBuffer = new List<byte>();
stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null);
}
// These belong to the ReadCallback thread only.
private byte[] readBuffer;
private List<byte> messageBuffer;
private void ReadCallback(IAsyncResult result)
{
int bytesRead = Stream.EndRead(result);
messageBuffer.AddRange(readBuffer.Take(bytesRead));
if(messageBuffer.Count >= 4)
{
int length = BitConverter.ToInt32(messageBuffer.Take(4).ToArray(), 0); // 4 bytes per int32
// Keep buffering until we get a full message.
if(messageBuffer.Count >= length + 4)
{
messageBuffer.Skip(4);
OnMessageReceived(new MessageEventArgs(messageBuffer.Take(length)));
messageBuffer.Skip(length);
}
}
// FIXME below is kinda hacky (I don't know the proper way of doing things...)
// Don't bother reading again. We don't have stream access.
if(disposed)
return;
try
{
Stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null);
}
catch(ObjectDisposedException)
{
// DO NOTHING
// Ends read loop.
}
}
public Stream Stream
{
get;
private set;
}
public event EventHandler<MessageEventArgs> MessageReceived;
protected virtual void OnMessageReceived(MessageEventArgs e)
{
var messageReceived = MessageReceived;
if(messageReceived != null)
messageReceived(this, e);
}
public virtual void SendMessage(Message message)
{
// Have fun ...
}
// Dispose stuff here
}
Run Code Online (Sandbox Code Playgroud)