ren*_*ren 1 c# sockets .net-core
我有用经典的 .net 编写的客户端和服务器,通过 TCP 套接字进行通信。我还进行了许多并行连接的负载测试,并且一切正常。
但是,使用 .netcore 的相同代码会中断。在 linux 上,客户端在尝试从流中读取时总是出现异常:
客户端套接字错误:无法从传输连接读取数据:连接超时。
或者服务器也可能返回 0 作为要读取的字节。
在 Windows 上,.netcore 客户端中断的频率较低,但有时仍会中断并显示如下错误:
socket error: Unable to read data from the transport connection: 连接尝试失败,因为连接方在一段时间后没有正确响应,或者建立连接失败,因为连接的主机没有响应
.netcore 3.0 顺便说一下。
任何想法为什么会发生这种情况?
客户:
public class TcpConnection
{
object _lock = new object();
bool _is_busy = false;
public bool TakeLock()
{
lock (_lock)
{
if (_is_busy)
{
return false;
}
else
{
_is_busy = true;
return true;
}
}
}
public void ReleaseLock()
{
_is_busy = false;
}
public bool Connected { get; set; }
public string ConnError { get; set; }
public Socket client { get; set; }
public Stream stream { get; set; }
public BinaryWriter bw { get; set; }
public DateTime LastUsed { get; set; }
public int Index { get; set; }
public TcpConnection(string hostname, int port)
{
client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();
connectEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ConnectedEvent);
connectEventArg.UserToken = this;
connectEventArg.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(hostname), port);
var connected = client.ConnectAsync(connectEventArg);
if (!connected)
{
if (connectEventArg.SocketError != SocketError.Success)
{
#if (VERBOSE)
Console.WriteLine("Connection error (immediate)");
#endif
throw new LinqDbException("Linqdb: Connection error (immediate)");
}
#if (VERBOSE)
Console.WriteLine("Connected immediately");
#endif
//client.NoDelay = true;
client.ReceiveTimeout = 60000;
client.SendTimeout = 60000;
this.stream = new NetworkStream(client);
this.bw = new BinaryWriter(stream);
}
else
{
int total_wait_ms = 0;
while (!this.Connected)
{
Thread.Sleep(100);
total_wait_ms += 100;
#if (VERBOSE)
if (total_wait_ms % 2000 == 0)
{
Console.WriteLine("Can't connect in {0} ms", total_wait_ms);
}
#endif
}
if (!string.IsNullOrEmpty(this.ConnError))
{
throw new LinqDbException(this.ConnError + " after " + total_wait_ms + " ms wait time");
}
else
{
#if (VERBOSE)
Console.WriteLine("Connected {0} ms", total_wait_ms);
#endif
}
}
_is_busy = true;
LastUsed = DateTime.Now;
}
private void ConnectedEvent(object sender, SocketAsyncEventArgs e)
{
TcpConnection conn = e.UserToken as TcpConnection;
if (e.SocketError != SocketError.Success)
{
#if (VERBOSE)
Console.WriteLine("Connection error");
#endif
conn.ConnError = "Connection error";
conn.Connected = true;
return;
}
//e.ConnectSocket.NoDelay = true;
e.ConnectSocket.ReceiveTimeout = 60000;
e.ConnectSocket.SendTimeout = 60000;
conn.stream = new NetworkStream(conn.client);
conn.bw = new BinaryWriter(conn.stream);
conn.ConnError = null;
conn.Connected = true;
}
}
public class ClientSockets
{
const int _limit = 100;
TcpConnection[] cons = new TcpConnection[_limit];
object _lock = new object();
object[] _locks = null;
public byte[] CallServer(byte[] input, string hostname, int port, out string error_msg)
{
error_msg = null;
if (_locks == null)
{
lock (_lock)
{
if (_locks == null)
{
_locks = new object[_limit];
for (int i = 0; i < _limit; i++)
{
_locks[i] = new object();
}
}
}
}
TcpConnection conn = null;
while (true)
{
int last_index = 0;
for (int i = _limit - 1; i >= 0; i--)
{
if (cons[i] != null)
{
last_index = i;
break;
}
}
for (int i = 0; i < _limit; i++)
{
var tmp = cons[i];
if (tmp != null)
{
var available = tmp.TakeLock();
if (!available)
{
continue;
}
else
{
if ((DateTime.Now - tmp.LastUsed).TotalSeconds > 30)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
else
{
//ping
tmp.bw.Write(BitConverter.GetBytes(-3));
tmp.bw.Flush();
int numBytesRead = 0;
var data = new byte[1024];
var bad = false;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = tmp.stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//server closed connection
bad = true;
break;
}
numBytesRead += read;
if (read <= 0)
{
//server closed connection
bad = true;
break;
}
}
if (bad)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
var pong = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (pong != -3)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
//socket is ok
conn = tmp;
break;
}
}
}
else
{
if (i < last_index)
{
continue;
}
if (Monitor.TryEnter(_locks[i]))
{
try
{
if (cons[i] != null)
{
continue;
}
conn = new TcpConnection(hostname, port);
cons[i] = conn;
conn.Index = i;
break;
}
catch (Exception ex)
{
conn = null;
cons[i] = null;
#if (VERBOSE)
Console.WriteLine("Client socket creation error: " + ex.Message);
#endif
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
Monitor.Exit(_locks[i]);
}
}
else
{
continue;
}
}
}
if (conn == null)
{
Thread.Sleep(150);
continue;
}
else
{
break;
}
}
bool error = false;
try
{
var length = BitConverter.GetBytes(input.Length);
var data = new byte[1024];
conn.bw.Write(input);
conn.bw.Flush();
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead;
int total;
while (true)
{
numBytesRead = 0;
while (numBytesRead < 4)
{
int read = conn.stream.Read(data, numBytesRead, data.Length - numBytesRead);
numBytesRead += read;
if (read <= 0)
{
throw new LinqDbException("Read <= 0: " + read);
}
}
numBytesRead -= 4;
total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -2)
{
#if (VERBOSE)
Console.WriteLine("PINGER!!!");
#endif
continue;
}
break;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = conn.stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new LinqDbException("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
conn.LastUsed = DateTime.Now;
return ms.ToArray();
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Client socket error: " + ex.Message);
#endif
error = true;
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
if (!error)
{
conn.ReleaseLock();
}
else
{
cons[conn.Index] = null;
try
{
conn.client.Dispose();
conn.stream.Dispose();
conn.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
服务器:
class Pinger
{
public bool Done { get; set; }
public object _lock = new object();
public BinaryWriter bw { get; set; }
public void Do()
{
try
{
int total_wait = 0;
int sleep_ms = 2000;
while (!Done)
{
Thread.Sleep(sleep_ms);
total_wait += sleep_ms;
if (total_wait % 10000 == 0)
{
lock (_lock)
{
if (!Done)
{
bw.Write(BitConverter.GetBytes(-2));
bw.Flush();
}
}
}
}
}
catch { return; }
}
}
class ServerSockets
{
static Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
static string db_path = null;
static int port = 0;
public static void Main()
{
AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
CommandHelper.ReadConfig(out db_path, out port);
var sw = new Stopwatch();
sw.Start();
Console.WriteLine("Building in-memory indexes...");
ServerLogic.Logic.ServerBuildIndexesOnStart(db_path);
sw.Stop();
Console.WriteLine("Done building in-memory indexes. It took: " + Math.Round(sw.ElapsedMilliseconds / 60000.0, 0) + " min.");
Console.WriteLine("Listening on " + port);
listener.Bind(new IPEndPoint(IPAddress.Any, port));
listener.Listen((int)SocketOptionName.MaxConnections);
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
while (true)
{
try
{
Thread.Sleep(60000);
#if (VERBOSE)
Console.WriteLine("Still kicking...");
#endif
}
catch (Exception ex)
{
Console.WriteLine("BAD ERROR... " + ex.Message);
}
}
}
static void OnProcessExit(object sender, EventArgs e)
{
ServerLogic.Logic.Dispose();
}
private static void LoopToStartAccept()
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
}
private static void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
#if (VERBOSE)
Console.WriteLine("bad accept");
#endif
acceptEventArgs.AcceptSocket.Dispose();
}
private static void Service(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
LoopToStartAccept();
HandleBadAccept(e);
return;
}
LoopToStartAccept();
try
{
using (Socket soc = e.AcceptSocket)
{
var rg = new Random();
#if (VERBOSE)
Console.WriteLine("New socket: " + rg.Next(0, 1000000));
#endif
//soc.NoDelay = true;
soc.ReceiveTimeout = 60000;
soc.SendTimeout = 60000;
using (Stream stream = new NetworkStream(soc))
using (BinaryWriter bw = new BinaryWriter(stream))
{
while (true) //reuse same connection for many commands
{
byte[] data = new byte[1024];
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead = 0;
while (numBytesRead < 4)
{
int read = 0;
我调试了您的代码,看起来问题是错误而不是 .NET 核心中的套接字问题。从代码看来,您希望发送的数据的前四个字节将包含数据的长度,但您只发送数据。这会导致您获得随机数据长度,因为数据的前四个字节用作长度。在某些情况下,这大于实际数据,然后读取数据超时的 while 循环等待更多永远不会到达的数据。
这是客户端代码中的问题部分:
var length = BitConverter.GetBytes(input.Length); // You prepare the length
var data = new byte[1024];
conn.bw.Write(length); // This is missing in your code so it never gets sent to the server
conn.bw.Write(input);
conn.bw.Flush();
Run Code Online (Sandbox Code Playgroud)
包含修复的完整客户端:
public class ClientSockets
{
const int _limit = 100;
TcpConnection[] cons = new TcpConnection[_limit];
object _lock = new object();
object[] _locks = null;
public byte[] CallServer(byte[] input, string hostname, int port, out string error_msg)
{
error_msg = null;
if (_locks == null)
{
lock (_lock)
{
if (_locks == null)
{
_locks = new object[_limit];
for (int i = 0; i < _limit; i++)
{
_locks[i] = new object();
}
}
}
}
TcpConnection conn = null;
while (true)
{
int last_index = 0;
for (int i = _limit - 1; i >= 0; i--)
{
if (cons[i] != null)
{
last_index = i;
break;
}
}
for (int i = 0; i < _limit; i++)
{
var tmp = cons[i];
if (tmp != null)
{
var available = tmp.TakeLock();
if (!available)
{
continue;
}
else
{
if ((DateTime.Now - tmp.LastUsed).TotalSeconds > 30)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
else
{
//ping
tmp.bw.Write(BitConverter.GetBytes(-3));
tmp.bw.Flush();
int numBytesRead = 0;
var data = new byte[1024];
var bad = false;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = tmp.stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//server closed connection
bad = true;
break;
}
numBytesRead += read;
if (read <= 0)
{
//server closed connection
bad = true;
break;
}
}
if (bad)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
var pong = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (pong != -3)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
//socket is ok
conn = tmp;
break;
}
}
}
else
{
if (i < last_index)
{
continue;
}
if (Monitor.TryEnter(_locks[i]))
{
try
{
if (cons[i] != null)
{
continue;
}
conn = new TcpConnection(hostname, port);
cons[i] = conn;
conn.Index = i;
break;
}
catch (Exception ex)
{
conn = null;
cons[i] = null;
#if (VERBOSE)
Console.WriteLine("Client socket creation error: " + ex.Message);
#endif
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
Monitor.Exit(_locks[i]);
}
}
else
{
continue;
}
}
}
if (conn == null)
{
Thread.Sleep(150);
continue;
}
else
{
break;
}
}
bool error = false;
try
{
var length = BitConverter.GetBytes(input.Length);
var data = new byte[1024];
conn.bw.Write(length); // Send the length first.
conn.bw.Write(input);
conn.bw.Flush();
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead;
int total;
while (true)
{
numBytesRead = 0;
while (numBytesRead < 4)
{
int read = conn.stream.Read(data, numBytesRead, data.Length - numBytesRead);
numBytesRead += read;
if (read <= 0)
{
throw new LinqDbException("Read <= 0: " + read);
}
}
numBytesRead -= 4;
total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -2)
{
#if (VERBOSE)
Console.WriteLine("PINGER!!!");
#endif
continue;
}
break;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = conn.stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new LinqDbException("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
conn.LastUsed = DateTime.Now;
return ms.ToArray();
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Client socket error: " + ex.Message);
#endif
error = true;
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
if (!error)
{
conn.ReleaseLock();
}
else
{
cons[conn.Index] = null;
try
{
conn.client.Dispose();
conn.stream.Dispose();
conn.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
同样的问题也存在于服务器代码中:
var output = ServerLogic.Logic.Execute(input, db_path);
var length = BitConverter.GetBytes(output.Length); // You again need to get the length
pinger.Done = true;
lock (pinger._lock)
{
bw.Write(length); // Send it before the data
bw.Write(output);
bw.Flush();
}
Run Code Online (Sandbox Code Playgroud)
包含修复的完整服务器:
class ServerSockets
{
static Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
static string db_path = null;
static int port = 0;
public static void Main()
{
AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
CommandHelper.ReadConfig(out db_path, out port);
var sw = new Stopwatch();
sw.Start();
Console.WriteLine("Building in-memory indexes...");
ServerLogic.Logic.ServerBuildIndexesOnStart(db_path);
sw.Stop();
Console.WriteLine("Done building in-memory indexes. It took: " + Math.Round(sw.ElapsedMilliseconds / 60000.0, 0) + " min.");
Console.WriteLine("Listening on " + port);
listener.Bind(new IPEndPoint(IPAddress.Any, port));
listener.Listen((int)SocketOptionName.MaxConnections);
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
while (true)
{
try
{
Thread.Sleep(60000);
#if (VERBOSE)
Console.WriteLine("Still kicking...");
#endif
}
catch (Exception ex)
{
Console.WriteLine("BAD ERROR... " + ex.Message);
}
}
}
static void OnProcessExit(object sender, EventArgs e)
{
ServerLogic.Logic.Dispose();
}
private static void LoopToStartAccept()
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
}
private static void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
#if (VERBOSE)
Console.WriteLine("bad accept");
#endif
acceptEventArgs.AcceptSocket.Dispose();
}
private static void Service(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
LoopToStartAccept();
HandleBadAccept(e);
return;
}
LoopToStartAccept();
try
{
using (Socket soc = e.AcceptSocket)
{
var rg = new Random();
#if (VERBOSE)
Console.WriteLine("New socket: " + rg.Next(0, 1000000));
#endif
//soc.NoDelay = true;
soc.ReceiveTimeout = 60000;
soc.SendTimeout = 60000;
using (Stream stream = new NetworkStream(soc))
using (BinaryWriter bw = new BinaryWriter(stream))
{
while (true) //reuse same connection for many commands
{
byte[] data = new byte[1024];
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead = 0;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//client closed connection
return;
}
numBytesRead += read;
if (read <= 0)
{
//throw new Exception("Read <= 0: " + read);
//client closed connection
return;
}
}
numBytesRead -= 4;
var total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -3) //ping
{
//pong
bw.Write(BitConverter.GetBytes(-3));
bw.Flush();
continue;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new Exception("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
var input = ms.ToArray();
var pinger = new Pinger()
{
bw = bw
};
ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
var output = ServerLogic.Logic.Execute(input, db_path);
var length = BitConverter.GetBytes(output.Length);
pinger.Done = true;
lock (pinger._lock)
{
bw.Write(length);
bw.Write(output);
bw.Flush();
}
}
}
}
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Socket error: " + ex.Message);
#endif
//try
//{
// var rg = new Random();
// File.WriteAllText("sock_error_" + rg.Next() + ".txt", ex.Message + " " + ex.StackTrace + (ex.InnerException != null ? (" " + ex.InnerException.Message + " " + ex.InnerException.StackTrace) : ""));
//}
//catch (Exception) { }
return;
}
finally
{
#if (VERBOSE)
Console.WriteLine("Listener finally ");
#endif
}
}
}
Run Code Online (Sandbox Code Playgroud)
对我来说,这两个更改代码适用于 Windows 和 Linux。如果您仍有问题,您还需要提供更多详细信息,您将发送哪些数据。
在您发布 GitHub 示例后,我对 Linux 进行了更多研究。正如我在下面的评论中已经建议的那样,您应该使用Connect方法而不是ConnectAsync来获得更好的行为。在我对 Raspbian 和 Arch Linux 的测试中,一旦您在客户端中打开大量线程,您可能会占用所有 ThreadPool,因此 Connect 事件停止发生并且一切都超时。由于您的代码在线程上循环并等待,因此无论如何使用 Async 方法都不会获得太多收益。
这是TcpConnection进行同步连接的更改类:
public class TcpConnection
{
object _lock = new object();
bool _is_busy = false;
public bool TakeLock()
{
lock (_lock)
{
if (_is_busy)
{
return false;
}
else
{
_is_busy = true;
return true;
}
}
}
public void ReleaseLock()
{
_is_busy = false;
}
public bool Connected { get; set; }
public string ConnError { get; set; }
public Socket client { get; set; }
public Stream stream { get; set; }
public BinaryWriter bw { get; set; }
public DateTime LastUsed { get; set; }
public int Index { get; set; }
public TcpConnection(string hostname, int port)
{
client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(new IPEndPoint(IPAddress.Parse(hostname), port));
if (client.Connected)
{
#if (VERBOSE)
Console.WriteLine("Connected immediately");
#endif
//client.NoDelay = true;
client.ReceiveTimeout = 60000;
client.SendTimeout = 60000;
this.stream = new NetworkStream(client);
this.bw = new BinaryWriter(stream);
}
_is_busy = true;
LastUsed = DateTime.Now;
}
}
Run Code Online (Sandbox Code Playgroud)
作为最后的评论,我不得不说您的代码非常复杂,并且没有遵循其他人已经指出的最佳实践。我建议您了解有关多线程和异步编程的更多信息,然后改进代码,这也将使其工作得更好、更可预测。
以下是有关如何异步使用套接字的 Microsoft 示例以及有关如何进行异步编程的一般文档的链接:
小智 5
首先让我说代码非常难以理解。不仅在一个方法中发生了很多事情,您还要处理线程和(很多)锁。这使事情变得异常复杂,并且很难查明发生了什么。
就像大海捞针一样。除了由于超时,线程和不断移动指针的锁定机制而恶化一百倍。
我已经在一Windows 10台机器上运行了这个项目,Visual Studio 2019 Preview (16.4.0 preview 2.0)和 .NET Core 运行时3.1.0-preview1(目标框架是3.0)。
不过,让我说明一件事,您已将timeout客户端和服务器的接收/设置设置为60000. 我无法使用此重现您的错误。因为这个超时数太大了,一切最终都在超时发生之前就完成了。
为了重现您的错误,我大大降低了超时时间:
// File: ClientLib.cs
// Method: "ConnectedEvent(...)"
// Line 114-115
e.ConnectSocket.ReceiveTimeout = 1000;
// Lowering it even more multiplies the errors.
Run Code Online (Sandbox Code Playgroud)
上面现在给了我大约 5 个错误,然后又因为这个代码(Pinger实例化的地方)而再次发生:
// File: Server\Program.cs
// Method: "Service(...)"
Thread.Sleep(random.Next(100, 1000));
Run Code Online (Sandbox Code Playgroud)
上面还发生了以下情况:
ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
Run Code Online (Sandbox Code Playgroud)
服务器使用ThreadPool并且它可以在其上并发处理8 个操作1。您不仅ThreadPool在建立Socket连接时使用,而且在执行ThreadPool.QueueUserWorkItem(...).
当客户端开始尝试建立大量连接时,这意味着大麻烦。因为这也可能导致超时。
增加线程的一种方法ThreadPool:
public static void Main()
{
int w, c = 0;
ThreadPool.GetMinThreads(out w, out c);
Console.WriteLine($"ThreadPool max: {w} worker threads {c} completion threads");
ThreadPool.SetMinThreads(100, 100);
ThreadPool.GetMinThreads(out w, out c);
Console.WriteLine($"ThreadPool max: {w} worker threads {c} completion threads");
/// other code...
}
Run Code Online (Sandbox Code Playgroud)
但实际上,只需删除这两行即可解决问题:
var random = new Random();
Thread.Sleep(random.Next(100, 1000));
Run Code Online (Sandbox Code Playgroud)
1这可能因机器而异,请参阅文档。您可以使用ThreadPool.GetMinThreads(...)在您的机器上检查它。
| 归档时间: |
|
| 查看次数: |
973 次 |
| 最近记录: |