Eri*_*sch 148 .net c# networking tcp scalability
我正处于编写新的Windows服务应用程序的设计阶段,该应用程序接受长连接的TCP/IP连接(即,这不像HTTP,其中有许多短连接,而是客户端连接并保持连接数小时或数天或甚至几周).
我正在寻找设计网络架构的最佳方法的想法.我将需要为该服务启动至少一个线程.我正在考虑使用Asynch API(BeginRecieve等),因为我不知道在任何给定时间(可能是数百个)我将连接多少客户端.我绝对不想为每个连接启动一个线程.
数据将主要从我的服务器流向客户端,但有时会从客户端发送一些命令.这主要是一个监控应用程序,我的服务器定期向客户端发送状态数据.
有关尽可能扩展的最佳方法的任何建议吗?基本工作流程 谢谢.
编辑:要明确,我正在寻找基于.net的解决方案(如果可能,C#,但任何.net语言都可以)
BOUNTY注意:要获得赏金,我希望不仅仅是一个简单的答案.我需要一个解决方案的工作示例,作为指向我可以下载的内容的指针或在线的简短示例.它必须是基于.net和Windows(任何.net语言都可以接受)
编辑:我要感谢所有给出好答案的人.不幸的是,我只能接受一个,我选择接受更为人熟知的Begin/End方法.Esac的解决方案可能会更好,但它仍然足够新,我不确定它将如何运作.
我已经提出了我认为很好的所有答案,我希望我能为你们做更多的事情.再次感谢.
Kev*_*bet 92
我过去曾写过类似的东西.从我多年前的研究表明,使用异步套接字编写自己的套接字实现是最好的选择.这意味着没有真正做任何事的客户实际上需要相对较少的资源.任何确实发生的事情都由.net线程池处理.
我把它写成一个管理服务器所有连接的类.
我只是使用一个列表来保存所有客户端连接,但如果你需要更快的查找更大的列表,你可以随意编写它.
private List<xConnection> _sockets;
Run Code Online (Sandbox Code Playgroud)
你还需要插座实际上听取连接.
private System.Net.Sockets.Socket _serverSocket;
Run Code Online (Sandbox Code Playgroud)
start方法实际上启动服务器套接字并开始侦听任何接收连接.
public bool Start()
{
System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
System.Net.IPEndPoint serverEndPoint;
try
{
serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port);
}
catch (System.ArgumentOutOfRangeException e)
{
throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e);
}
try
{
_serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
}
catch (System.Net.Sockets.SocketException e)
{
throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
}
try
{
_serverSocket.Bind(serverEndPoint);
_serverSocket.Listen(_backlog);
}
catch (Exception e)
{
throw new ApplicationException("Error occured while binding socket, check inner exception", e);
}
try
{
//warning, only call this once, this is a bug in .net 2.0 that breaks if
// you're running multiple asynch accepts, this bug may be fixed, but
// it was a major pain in the ass previously, so make sure there is only one
//BeginAccept running
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
catch (Exception e)
{
throw new ApplicationException("Error occured starting listeners, check inner exception", e);
}
return true;
}
Run Code Online (Sandbox Code Playgroud)
我只想注意异常处理代码看起来很糟糕,但其原因是我在那里有异常抑制代码,以便任何异常都会被抑制并返回false如果设置了config选项,但我想删除它简洁的缘故.
上面的_serverSocket.BeginAccept(new AsyncCallback(acceptCallback)),_ serverSocket)实际上设置了我们的服务器套接字,以便在用户连接时调用acceptCallback方法.此方法从.Net线程池运行,如果您有许多阻塞操作,它会自动处理创建其他工作线程.这应该最佳地处理服务器上的任何负载.
private void acceptCallback(IAsyncResult result)
{
xConnection conn = new xConnection();
try
{
//Finish accepting the connection
System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
conn = new xConnection();
conn.socket = s.EndAccept(result);
conn.buffer = new byte[_bufferSize];
lock (_sockets)
{
_sockets.Add(conn);
}
//Queue recieving of data from the connection
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
//Queue the accept of the next incomming connection
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
catch (SocketException e)
{
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
//Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
catch (Exception e)
{
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
//Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码基本上刚刚接受了进来的连接,队列BeginReceive是一个回调,它将在客户端发送数据时运行,然后排队接下来acceptCallback将接受下一个客户端连接的下一个客户端连接.
该BeginReceive方法的调用是什么告诉套接字,当它从客户端接收数据做什么.因为BeginReceive,你需要给它一个字节数组,这是客户端发送数据时复制数据的地方.ReceiveCallback将调用该方法,这是我们处理接收数据的方式.
private void ReceiveCallback(IAsyncResult result)
{
//get our connection from the callback
xConnection conn = (xConnection)result.AsyncState;
//catch any errors, we'd better not have any
try
{
//Grab our buffer and count the number of bytes receives
int bytesRead = conn.socket.EndReceive(result);
//make sure we've read something, if we haven't it supposadly means that the client disconnected
if (bytesRead > 0)
{
//put whatever you want to do when you receive data here
//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
}
else
{
//Callback run but no data, close the connection
//supposadly means a disconnect
//and we still have to close the socket, even though we throw the event later
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
}
catch (SocketException e)
{
//Something went terribly wrong
//which shouldn't have happened
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:在这种模式中,我忘了在这段代码中提到:
//put whatever you want to do when you receive data here
//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
Run Code Online (Sandbox Code Playgroud)
我通常会做的是在你想要的代码中,将数据包重新组装成消息,然后在线程池中创建它们作为作业.这样,在运行任何消息处理代码时,来自客户端的下一个块的BeginReceive不会延迟.
accept回调通过调用end receive完成读取数据套接字.这将填充begin receive函数中提供的缓冲区.一旦你在我离开评论的地方做任何你想做的事情,我们就会调用下一个BeginReceive方法,如果客户端发送更多数据,它将再次运行回调.现在这里是非常棘手的部分,当客户端发送数据时,您的接收回调可能只会被部分消息调用.重新组装会变得非常复杂.我使用自己的方法并创建了一种专有协议来执行此操作.我把它留了下来,但如果你要求,我可以添加它.这个处理程序实际上是我写过的最复杂的代码片段.
public bool Send(byte[] message, xConnection conn)
{
if (conn != null && conn.socket.Connected)
{
lock (conn.socket)
{
//we use a blocking mode send, no async on the outgoing
//since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
conn.socket.Send(bytes, bytes.Length, SocketFlags.None);
}
}
else
return false;
return true;
}
Run Code Online (Sandbox Code Playgroud)
上面的send方法实际上使用了一个同步Send调用,对我而言,由于我的应用程序的消息大小和多线程特性,这很好.如果要发送到每个客户端,只需循环遍历_sockets列表.
您在上面引用的xConnection类基本上是一个简单的包装器,用于包含字节缓冲区的套接字,在我的实现中还有一些额外的东西.
public class xConnection : xBase
{
public byte[] buffer;
public System.Net.Sockets.Socket socket;
}
Run Code Online (Sandbox Code Playgroud)
此处还有参考资料,using因为当我们不包括在内时,我总是感到恼火.
using System.Net.Sockets;
Run Code Online (Sandbox Code Playgroud)
我希望这有用,它可能不是最干净的代码,但它有效.代码也有一些细微差别,你应该对改变感到厌倦.例如,BeginAccept任何时候只有一个叫.曾经有一个非常烦人的.net漏洞,这是多年前的事情,所以我不记得细节了.
此外,在ReceiveCallback代码中,我们处理从套接字接收的任何内容,然后排队下一次接收.这意味着对于单个套接字,我们实际上只 ReceiveCallback在任何时间点进行一次,并且我们不需要使用线程同步.但是,如果您重新排序此项以在提取数据后立即调用下一个接收,这可能会快一些,您需要确保正确同步线程.
此外,我砍掉了很多我的代码,但留下了正在发生的事情的本质.这应该是你设计的一个良好开端.如果您对此有任何疑问,请发表评论.
esa*_*sac 83
在C#中进行网络操作的方法有很多种.它们都使用不同的机制,因此具有高并发性的主要性能问题.开始*操作是许多人经常误认为是更快/最快的网络方式之一.
为了解决这些问题,他们引入了*Async方法集:来自MSDN http://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs.aspx
SocketAsyncEventArgs类是System.Net.Sockets .. ::.Socket类的一组增强的一部分,它提供了可供专用高性能套接字应用程序使用的备用异步模式.此类专为需要高性能的网络服务器应用程序而设计.应用程序可以仅使用增强型异步模式,也可以仅在目标热区域中使用(例如,在接收大量数据时).
这些增强功能的主要特征是避免在高容量异步套接字I/O期间重复分配和同步对象.当前由System.Net.Sockets .. ::.Socket类实现的Begin/End设计模式需要为每个异步套接字操作分配System .. ::.IAsyncResult对象.
在幕后,*Async API使用IO完成端口,这是执行网络操作的最快方式,请参阅http://msdn.microsoft.com/en-us/magazine/cc302334.aspx
只是为了帮助你,我包括我使用*Async API编写的telnet服务器的源代码.我只包括相关部分.另外需要注意的是,我不是内联处理数据,而是选择将其推送到在单独线程上处理的无锁(等待空闲)队列.请注意,我没有包含相应的Pool类,它只是一个简单的池,如果它是空的,它将创建一个新的对象,而Buffer类只是一个自我扩展的缓冲区,除非你收到一个不确定的东西,否则它不是真正需要的数据量.如果您想了解更多信息,请随时给我发送PM.
public class Telnet
{
private readonly Pool<SocketAsyncEventArgs> m_EventArgsPool;
private Socket m_ListenSocket;
/// <summary>
/// This event fires when a connection has been established.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> Connected;
/// <summary>
/// This event fires when a connection has been shutdown.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> Disconnected;
/// <summary>
/// This event fires when data is received on the socket.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> DataReceived;
/// <summary>
/// This event fires when data is finished sending on the socket.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> DataSent;
/// <summary>
/// This event fires when a line has been received.
/// </summary>
public event EventHandler<LineReceivedEventArgs> LineReceived;
/// <summary>
/// Specifies the port to listen on.
/// </summary>
[DefaultValue(23)]
public int ListenPort { get; set; }
/// <summary>
/// Constructor for Telnet class.
/// </summary>
public Telnet()
{
m_EventArgsPool = new Pool<SocketAsyncEventArgs>();
ListenPort = 23;
}
/// <summary>
/// Starts the telnet server listening and accepting data.
/// </summary>
public void Start()
{
IPEndPoint endpoint = new IPEndPoint(0, ListenPort);
m_ListenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
m_ListenSocket.Bind(endpoint);
m_ListenSocket.Listen(100);
//
// Post Accept
//
StartAccept(null);
}
/// <summary>
/// Not Yet Implemented. Should shutdown all connections gracefully.
/// </summary>
public void Stop()
{
//throw (new NotImplementedException());
}
//
// ACCEPT
//
/// <summary>
/// Posts a requests for Accepting a connection. If it is being called from the completion of
/// an AcceptAsync call, then the AcceptSocket is cleared since it will create a new one for
/// the new user.
/// </summary>
/// <param name="e">null if posted from startup, otherwise a <b>SocketAsyncEventArgs</b> for reuse.</param>
private void StartAccept(SocketAsyncEventArgs e)
{
if (e == null)
{
e = m_EventArgsPool.Pop();
e.Completed += Accept_Completed;
}
else
{
e.AcceptSocket = null;
}
if (m_ListenSocket.AcceptAsync(e) == false)
{
Accept_Completed(this, e);
}
}
/// <summary>
/// Completion callback routine for the AcceptAsync post. This will verify that the Accept occured
/// and then setup a Receive chain to begin receiving data.
/// </summary>
/// <param name="sender">object which posted the AcceptAsync</param>
/// <param name="e">Information about the Accept call.</param>
private void Accept_Completed(object sender, SocketAsyncEventArgs e)
{
//
// Socket Options
//
e.AcceptSocket.NoDelay = true;
//
// Create and setup a new connection object for this user
//
Connection connection = new Connection(this, e.AcceptSocket);
//
// Tell the client that we will be echo'ing data sent
//
DisableEcho(connection);
//
// Post the first receive
//
SocketAsyncEventArgs args = m_EventArgsPool.Pop();
args.UserToken = connection;
//
// Connect Event
//
if (Connected != null)
{
Connected(this, args);
}
args.Completed += Receive_Completed;
PostReceive(args);
//
// Post another accept
//
StartAccept(e);
}
//
// RECEIVE
//
/// <summary>
/// Post an asynchronous receive on the socket.
/// </summary>
/// <param name="e">Used to store information about the Receive call.</param>
private void PostReceive(SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;
if (connection != null)
{
connection.ReceiveBuffer.EnsureCapacity(64);
e.SetBuffer(connection.ReceiveBuffer.DataBuffer, connection.ReceiveBuffer.Count, connection.ReceiveBuffer.Remaining);
if (connection.Socket.ReceiveAsync(e) == false)
{
Receive_Completed(this, e);
}
}
}
/// <summary>
/// Receive completion callback. Should verify the connection, and then notify any event listeners
/// that data has been received. For now it is always expected that the data will be handled by the
/// listeners and thus the buffer is cleared after every call.
/// </summary>
/// <param name="sender">object which posted the ReceiveAsync</param>
/// <param name="e">Information about the Receive call.</param>
private void Receive_Completed(object sender, SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;
if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || connection == null)
{
Disconnect(e);
return;
}
connection.ReceiveBuffer.UpdateCount(e.BytesTransferred);
OnDataReceived(e);
HandleCommand(e);
Echo(e);
OnLineReceived(connection);
PostReceive(e);
}
/// <summary>
/// Handles Event of Data being Received.
/// </summary>
/// <param name="e">Information about the received data.</param>
protected void OnDataReceived(SocketAsyncEventArgs e)
{
if (DataReceived != null)
{
DataReceived(this, e);
}
}
/// <summary>
/// Handles Event of a Line being Received.
/// </summary>
/// <param name="connection">User connection.</param>
protected void OnLineReceived(Connection connection)
{
if (LineReceived != null)
{
int index = 0;
int start = 0;
while ((index = connection.ReceiveBuffer.IndexOf('\n', index)) != -1)
{
string s = connection.ReceiveBuffer.GetString(start, index - start - 1);
s = s.Backspace();
LineReceivedEventArgs args = new LineReceivedEventArgs(connection, s);
Delegate[] delegates = LineReceived.GetInvocationList();
foreach (Delegate d in delegates)
{
d.DynamicInvoke(new object[] { this, args });
if (args.Handled == true)
{
break;
}
}
if (args.Handled == false)
{
connection.CommandBuffer.Enqueue(s);
}
start = index;
index++;
}
if (start > 0)
{
connection.ReceiveBuffer.Reset(0, start + 1);
}
}
}
//
// SEND
//
/// <summary>
/// Overloaded. Sends a string over the telnet socket.
/// </summary>
/// <param name="connection">Connection to send data on.</param>
/// <param name="s">Data to send.</param>
/// <returns>true if the data was sent successfully.</returns>
public bool Send(Connection connection, string s)
{
if (String.IsNullOrEmpty(s) == false)
{
return Send(connection, Encoding.Default.GetBytes(s));
}
return false;
}
/// <summary>
/// Overloaded. Sends an array of data to the client.
/// </summary>
/// <param name="connection">Connection to send data on.</param>
/// <param name="data">Data to send.</param>
/// <returns>true if the data was sent successfully.</returns>
public bool Send(Connection connection, byte[] data)
{
return Send(connection, data, 0, data.Length);
}
public bool Send(Connection connection, char c)
{
return Send(connection, new byte[] { (byte)c }, 0, 1);
}
/// <summary>
/// Sends an array of data to the client.
/// </summary>
/// <param name="connection">Connection to send data on.</param>
/// <param name="data">Data to send.</param>
/// <param name="offset">Starting offset of date in the buffer.</param>
/// <param name="length">Amount of data in bytes to send.</param>
/// <returns></returns>
public bool Send(Connection connection, byte[] data, int offset, int length)
{
bool status = true;
if (connection.Socket == null || connection.Socket.Connected == false)
{
return false;
}
SocketAsyncEventArgs args = m_EventArgsPool.Pop();
args.UserToken = connection;
args.Completed += Send_Completed;
args.SetBuffer(data, offset, length);
try
{
if (connection.Socket.SendAsync(args) == false)
{
Send_Completed(this, args);
}
}
catch (ObjectDisposedException)
{
//
// return the SocketAsyncEventArgs back to the pool and return as the
// socket has been shutdown and disposed of
//
m_EventArgsPool.Push(args);
status = false;
}
return status;
}
/// <summary>
/// Sends a command telling the client that the server WILL echo data.
/// </summary>
/// <param name="connection">Connection to disable echo on.</param>
public void DisableEcho(Connection connection)
{
byte[] b = new byte[] { 255, 251, 1 };
Send(connection, b);
}
/// <summary>
/// Completion callback for SendAsync.
/// </summary>
/// <param name="sender">object which initiated the SendAsync</param>
/// <param name="e">Information about the SendAsync call.</param>
private void Send_Completed(object sender, SocketAsyncEventArgs e)
{
e.Completed -= Send_Completed;
m_EventArgsPool.Push(e);
}
/// <summary>
/// Handles a Telnet command.
/// </summary>
/// <param name="e">Information about the data received.</param>
private void HandleCommand(SocketAsyncEventArgs e)
{
Connection c = e.UserToken as Connection;
if (c == null || e.BytesTransferred < 3)
{
return;
}
for (int i = 0; i < e.BytesTransferred; i += 3)
{
if (e.BytesTransferred - i < 3)
{
break;
}
if (e.Buffer[i] == (int)TelnetCommand.IAC)
{
TelnetCommand command = (TelnetCommand)e.Buffer[i + 1];
TelnetOption option = (TelnetOption)e.Buffer[i + 2];
switch (command)
{
case TelnetCommand.DO:
if (option == TelnetOption.Echo)
{
// ECHO
}
break;
case TelnetCommand.WILL:
if (option == TelnetOption.Echo)
{
// ECHO
}
break;
}
c.ReceiveBuffer.Remove(i, 3);
}
}
}
/// <summary>
/// Echoes data back to the client.
/// </summary>
/// <param name="e">Information about the received data to be echoed.</param>
private void Echo(SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;
if (connection == null)
{
return;
}
//
// backspacing would cause the cursor to proceed beyond the beginning of the input line
// so prevent this
//
string bs = connection.ReceiveBuffer.ToString();
if (bs.CountAfterBackspace() < 0)
{
return;
}
//
// find the starting offset (first non-backspace character)
//
int i = 0;
for (i = 0; i < connection.ReceiveBuffer.Count; i++)
{
if (connection.ReceiveBuffer[i] != '\b')
{
break;
}
}
string s = Encoding.Default.GetString(e.Buffer, Math.Max(e.Offset, i), e.BytesTransferred);
if (connection.Secure)
{
s = s.ReplaceNot("\r\n\b".ToCharArray(), '*');
}
s = s.Replace("\b", "\b \b");
Send(connection, s);
}
//
// DISCONNECT
//
/// <summary>
/// Disconnects a socket.
/// </summary>
/// <remarks>
/// It is expected that this disconnect is always posted by a failed receive call. Calling the public
/// version of this method will cause the next posted receive to fail and this will cleanup properly.
/// It is not advised to call this method directly.
/// </remarks>
/// <param name="e">Information about the socket to be disconnected.</param>
private void Disconnect(SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;
if (connection == null)
{
throw (new ArgumentNullException("e.UserToken"));
}
try
{
connection.Socket.Shutdown(SocketShutdown.Both);
}
catch
{
}
connection.Socket.Close();
if (Disconnected != null)
{
Disconnected(this, e);
}
e.Completed -= Receive_Completed;
m_EventArgsPool.Push(e);
}
/// <summary>
/// Marks a specific connection for graceful shutdown. The next receive or send to be posted
/// will fail and close the connection.
/// </summary>
/// <param name="connection"></param>
public void Disconnect(Connection connection)
{
try
{
connection.Socket.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
}
/// <summary>
/// Telnet command codes.
/// </summary>
internal enum TelnetCommand
{
SE = 240,
NOP = 241,
DM = 242,
BRK = 243,
IP = 244,
AO = 245,
AYT = 246,
EC = 247,
EL = 248,
GA = 249,
SB = 250,
WILL = 251,
WONT = 252,
DO = 253,
DONT = 254,
IAC = 255
}
/// <summary>
/// Telnet command options.
/// </summary>
internal enum TelnetOption
{
Echo = 1,
SuppressGoAhead = 3,
Status = 5,
TimingMark = 6,
TerminalType = 24,
WindowSize = 31,
TerminalSpeed = 32,
RemoteFlowControl = 33,
LineMode = 34,
EnvironmentVariables = 36
}
}
Run Code Online (Sandbox Code Playgroud)
jer*_*jvl 46
曾经有过使用由Coversant的Chris Mullins编写的.NET的可扩展TCP/IP的非常好的讨论,不幸的是,他的博客似乎已经从之前的位置消失了,所以我将尝试将他的建议从内存中拼凑起来(一些有用的评论)他出现在这个帖子中:C++与C#:开发高度可扩展的IOCP服务器)
首先,请注意,使用Begin/End和类Async上的方法Socket都使用IO完成端口(IOCP)来提供可伸缩性.这比实际选择实现解决方案的两种方法中的哪种方法具有更大的差异(正确使用时;见下文).
Chris Mullins的帖子基于使用Begin/End,这是我个人经历过的.请注意,Chris将基于此的解决方案放在一起,在具有2GB内存的32位计算机上扩展到10,000个并发客户端连接,在具有足够内存的64位平台上扩展到100,000个.根据我自己使用这种技术的经验(尽管没有这种负荷),我没有理由怀疑这些指示性数字.
您想要使用IOCP的机制的原因是它使用一个非常低级别的Windows线程池,在您尝试读取IO通道上的实际数据之前,它不会唤醒任何线程(请注意,IOCP也可用于文件IO).这样做的好处是,Windows无需切换到线程,只是为了发现还没有数据,所以这样可以减少服务器必须进行的上下文切换次数.
上下文切换肯定会杀死"每个连接的线程"机制,尽管如果你只处理几十个连接,这是一个可行的解决方案.然而,这种机制并没有想象力的"可扩展性".
记忆
首先,了解如果您的实现过于幼稚,IOCP很容易导致.NET下的内存问题,这一点至关重要.每次IOCP BeginReceive调用都会导致您正在读取的缓冲区"固定".为了更好地解释为什么这是一个问题,请参阅:Yun Jin的Weblog:OutOfMemoryException和Pinning.
幸运的是,这个问题可以避免,但需要一些权衡.建议的解决方案是byte[]在应用程序启动时(或接近它)分配一个大缓冲区,至少90KB左右(从.NET 2开始,在以后的版本中所需的大小可能会更大).这样做的原因是大内存分配自动结束于有效自动固定的非压缩内存段(大对象堆).通过在启动时分配一个大缓冲区,您可以确保此不可移动内存块处于相对"低地址"的位置,在此处它不会妨碍并导致碎片.
然后,您可以使用偏移量将这个大缓冲区分割为需要读取某些数据的每个连接的单独区域.这是权衡发挥作用的地方; 由于此缓冲区需要预先分配,因此您必须确定每个连接需要多少缓冲区空间,以及要为要扩展的连接数设置的上限(或者,您可以实现抽象)一旦需要,可以分配额外的固定缓冲区).
最简单的解决方案是在此缓冲区内的唯一偏移量处为每个连接分配单个字节.然后,您可以BeginReceive调用单个字节进行读取,并根据您获得的回调执行剩余的读取操作.
处理
当您从Begin调用中获得回调时,非常重要的是要意识到回调中的代码将在低级IOCP线程上执行.在此回调中避免冗长的操作是绝对必要的.使用这些线程进行复杂处理会像使用"每个连接线程"一样有效地破坏您的可伸缩性.
建议的解决方案是仅使用回调来排队工作项以处理传入的数据,该数据将在其他某个线程上执行.避免回调中的任何潜在阻塞操作,以便IOCP线程可以尽快返回其池.在.NET 4.0中,我建议最简单的解决方案是生成一个Task,给它一个对客户端套接字的引用和一个已经被BeginReceive调用读取的第一个字节的副本.然后,此任务负责从套接字读取表示正在处理的请求,执行它的所有数据,然后再次进行新的BeginReceive调用以便为IOCP排队套接字.在.NET 4.0之前,您可以使用ThreadPool,或创建自己的线程工作队列实现.
基本上,我建议使用Kevin的示例代码来解决此问题,并添加以下警告:
BeginReceive已被"固定"BeginReceive只是排队任务以处理传入数据的实际处理当你这样做时,我毫不怀疑你可以复制克里斯的结果,扩展到可能成千上万的同时客户端(给定合适的硬件并有效实现你自己的处理代码;)
Rem*_*anu 22
您已经通过上面的代码示例获得了大部分答案.使用异步IO操作绝对是这里的方法.Async IO是Win32内部扩展设计的方式.使用完成端口可以获得最佳性能,将套接字绑定到完成端口并使线程池等待完成端口完成.常识是每个CPU(核心)有2-4个线程等待完成.我强烈建议您阅读Windows性能团队的Rick Vicik撰写的这三篇文章:
所述文章主要涵盖原生Windows API,但对于任何试图掌握可伸缩性和性能的人来说,它们都是必读的.他们确实在管理方面也有一些简要说明.
您需要做的第二件事是确保您查看可在线获得的" 改进的.NET应用程序性能和可伸缩性"一书.您将在第5章中找到关于线程,异步调用和锁的使用的相关且有效的建议.但真正的宝石在第17章中,您将找到有关调整线程池的实用指南的好处.在我根据本章的建议调整maxIothreads/maxWorkerThreads之前,我的应用程序存在一些严重问题.
你说你想做一个纯TCP服务器,所以我的下一点是假的.但是,如果您发现自己已经走投无路并使用WebRequest类及其衍生产品,请注意有一条守卫该门的龙:ServicePointManager.这是一个生活中有一个目的的配置类:破坏你的性能.确保将服务器从人为强加的ServicePoint.ConnectionLimit中解放出来,否则您的应用程序将无法扩展(我让您发现自己的默认值是什么......).您还可以重新考虑在http请求中发送Expect100Continue标头的默认策略.
现在关于核心套接字托管API在发送方面相当容易,但它们在接收方面要复杂得多.为了实现高吞吐量和扩展,必须确保套接字不受流控制,因为您没有为接收发布缓冲区.理想情况下,为了获得高性能,您应该提前发送3-4个缓冲区,并在获得一个缓冲区后立即发布新缓冲区(在您处理一个缓冲区之前),这样您就可以确保套接字始终存放在某处以存放来自网络的数据.你会明白为什么你很快就无法实现这一目标.
在您使用BeginRead/BeginWrite API完成并开始认真工作后,您将意识到您需要对流量进行安全保护,即.NTLM/Kerberos身份验证和流量加密,或至少是流量篡改保护.这样做的方法是使用内置的System.Net.Security.NegotiateStream(如果需要跨越不同的域,则使用SslStream).这意味着您将依赖AuthenticatedStream异步操作,而不是依赖于直接套接字异步操作.一旦获得套接字(来自客户端上的连接或来自服务器上的接受),您就可以通过调用BeginAuthenticateAsClient或BeginAuthenticateAsServer在套接字上创建流并提交它以进行身份验证.认证完成后(至少你的安全,从本机InitiateSecurityContext/AcceptSecurityContext疯狂......),你会通过检查你的身份验证流的RemoteIdentity财产和做什么ACL验证你的产品必须支持做你的授权.之后,您将使用BeginWrite发送消息,并且您将使用BeginRead接收消息.这是我之前讨论的问题,你将无法发布多个接收缓冲区,因为AuthenticateStream类不支持这个.BeginRead操作在内部管理所有IO,直到您收到整个帧,否则它无法处理消息身份验证(解密帧并验证帧上的签名).虽然根据我的经验,AuthenticatedStream类完成的工作相当不错,但不应该有任何问题.IE浏览器.你应该能够只用4-5%的CPU来使GB网络饱和.AuthenticatedStream类还会对您施加协议特定的帧大小限制(SSL为16k,Kerberos为12k).
这应该让你开始走上正轨.我不打算在这里发布代码,在MSDN上有一个很好的例子.我做了很多像这样的项目,我能够扩展到大约1000个用户连接没有问题.在此之上,您需要修改注册表项以允许内核获得更多套接字句柄.并确保您部署在服务器操作系统上,即W2K3而不是XP或Vista(即客户端操作系统),它会产生很大的不同.
BTW确保你是否在服务器或文件IO上有数据库操作,你也使用它们的异步风格,或者你将立即消耗线程池.对于SQL Server连接,请确保将"Asyncronous Processing = true"添加到连接字符串.
jva*_*erh 11
我在我的一些解决方案中运行了这样的服务器.以下是对.net中不同方法的详细解释: 使用.NET中的高性能套接字更接近线路
最近,我一直在寻找改进代码的方法,并将研究:" 版本3.5中的套接字性能增强 ",特别包含"供使用异步网络I/O以实现最高性能的应用程序".
"这些增强功能的主要特点是避免在高容量异步套接字I/O期间重复分配和同步对象.当前由Socket类为异步套接字I/O实现的Begin/End设计模式需要一个System.为每个异步套接字操作分配IAsyncResult对象."
如果您点击链接,您可以继续阅读.我个人将在明天测试他们的示例代码,以便根据我的情况对其进行基准测试.
编辑: 在这里,您可以使用新的3.5 SocketAsyncEventArgs找到客户端和服务器的工作代码,这样您就可以在几分钟内测试它并通过代码.这是一种简单的方法,但却是开始实施更大规模的基础.此外该文章中MSDN杂志从近两年前是一个有趣的阅读.
您是否考虑过仅使用WCF网络TCP绑定和发布/订阅模式?WCF将允许您[主要]关注您的域而不是管道..
在IDesign的下载部分有很多WCF示例甚至是发布/订阅框架,这可能很有用:http://www.idesign.net
我想知道一件事:
我绝对不想为每个连接启动一个线程.
这是为什么?至少从Windows 2000开始,Windows可以处理应用程序中的数百个线程.我已经完成了它,如果线程不需要同步,它就非常容易使用.特别是考虑到你正在做大量的I/O(所以你不受CPU限制,并且在磁盘或网络通信上会阻塞很多线程),我不明白这个限制.
您是否测试过多线程方式并发现它缺少某些东西?您是否打算为每个线程建立一个数据库连接(这会破坏数据库服务器,所以这是一个坏主意,但它可以通过3层设计轻松解决).你是否担心你会有成千上万的客户而不是数百人,然后你真的会有问题吗?(虽然如果我有32 GB以上的RAM,我会尝试一千个线程甚至一万个 - 再次,鉴于你没有CPU限制,线程切换时间应该绝对无关紧要.)
以下是代码 - 要查看其运行情况,请访问http://mdpopescu.blogspot.com/2009/05/multi-threaded-server.html并单击图片.
服务器类:
public class Server
{
private static readonly TcpListener listener = new TcpListener(IPAddress.Any, 9999);
public Server()
{
listener.Start();
Console.WriteLine("Started.");
while (true)
{
Console.WriteLine("Waiting for connection...");
var client = listener.AcceptTcpClient();
Console.WriteLine("Connected!");
// each connection has its own thread
new Thread(ServeData).Start(client);
}
}
private static void ServeData(object clientSocket)
{
Console.WriteLine("Started thread " + Thread.CurrentThread.ManagedThreadId);
var rnd = new Random();
try
{
var client = (TcpClient) clientSocket;
var stream = client.GetStream();
while (true)
{
if (rnd.NextDouble() < 0.1)
{
var msg = Encoding.ASCII.GetBytes("Status update from thread " + Thread.CurrentThread.ManagedThreadId);
stream.Write(msg, 0, msg.Length);
Console.WriteLine("Status update from thread " + Thread.CurrentThread.ManagedThreadId);
}
// wait until the next update - I made the wait time so small 'cause I was bored :)
Thread.Sleep(new TimeSpan(0, 0, rnd.Next(1, 5)));
}
}
catch (SocketException e)
{
Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
服务器主程序:
namespace ManyThreadsServer
{
internal class Program
{
private static void Main(string[] args)
{
new Server();
}
}
}
Run Code Online (Sandbox Code Playgroud)
客户类:
public class Client
{
public Client()
{
var client = new TcpClient();
client.Connect(IPAddress.Loopback, 9999);
var msg = new byte[1024];
var stream = client.GetStream();
try
{
while (true)
{
int i;
while ((i = stream.Read(msg, 0, msg.Length)) != 0)
{
var data = Encoding.ASCII.GetString(msg, 0, i);
Console.WriteLine("Received: {0}", data);
}
}
}
catch (SocketException e)
{
Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
客户主程序:
using System;
using System.Threading;
namespace ManyThreadsClient
{
internal class Program
{
private static void Main(string[] args)
{
// first argument is the number of threads
for (var i = 0; i < Int32.Parse(args[0]); i++)
new Thread(RunClient).Start();
}
private static void RunClient()
{
new Client();
}
}
}
Run Code Online (Sandbox Code Playgroud)
BeginRead如果能够正确获取所有细节,使用.NET的集成Async IO(等)是个好主意.正确设置套接字/文件句柄后,它将使用操作系统的底层IOCP实现,允许您的操作在不使用任何线程的情况下完成(或者,在最坏的情况下,使用我相信来自内核的IO线程池的线程.NET的线程池,有助于缓解线程池拥塞.)
主要问题是确保以非阻塞模式打开套接字/文件.大多数默认的便利功能(如File.OpenRead)不会这样做,所以你需要自己编写.
另一个主要问题是错误处理 - 在编写异步I/O代码时正确处理错误比在同步代码中执行错误要困难得多.即使您可能没有直接使用线程,也很容易最终遇到竞争条件和死锁,因此您需要了解这一点.
如果可能,您应该尝试使用便利库来简化可伸缩异步IO的过程.
Microsoft的并发协调运行时是.NET库的一个示例,旨在减轻执行此类编程的难度.它看起来很棒,但由于我没有使用它,我无法评论它的扩展程度.
对于我需要进行异步网络或磁盘I/O的个人项目,我使用了一套我在过去一年中构建的.NET并发/ IO工具,称为Squared.Task.它的灵感来自像imvu.task和twisted这样的库,我在存储库中包含了一些做网络I/O的工作示例.我也在我编写的一些应用程序中使用它 - 最大的公开发布的是NDexer(它用于无线磁盘I/O).该库是根据我对imvu.task的经验编写的,并且有一套相当全面的单元测试,因此我强烈建议你试一试.如果您有任何问题,我很乐意为您提供帮助.
在我看来,基于我使用异步/无线IO而不是线程的经验在.NET平台上是值得的,只要你准备好处理学习曲线.它允许您避免Thread对象的成本所带来的可伸缩性麻烦,并且在许多情况下,您可以通过仔细使用Futures/Promises等并发原语来完全避免使用锁和互斥锁.