20使用SocketAsyncEventArgs每秒接收

Kav*_*ian 9 c# sockets windows asynchronous socketasynceventargs

使用SocketAsyncEventArgs开发TCP服务器,它是Windows服务的异步方法.我在Main的开头有这两行代码:

ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
Run Code Online (Sandbox Code Playgroud)

并且都返回true(记录返回值).现在2000到3000个客户端开始向此服务器发送消息,它开始接受连接(我计算连接数,它是预期的 - 有一个连接池).服务器进程的线程数将增长到~2050到~3050.到现在为止还挺好!

现在有一个Received方法,在ReceiveAsync返回true或SocketAsyncEventArgs的Completed事件后调用.

问题就此开始了:无论客户端连接多少以及发送的消息数量多少,接收的消息最多只能在一秒钟内调用20次!随着客户数量的增加,这个数字(20)下降到~10.

环境:TCP服务器和客户端正在同一台计算机上进行模拟.我在两台机器上测试了代码,一台有2核CPU和4GB RAM,另一台有8核CPU和12GB RAM.(还没有)数据丢失,有时我在每次接收操作中都会收到多条消息.没关系.但是如何才能增加接收操作的数量?

关于实现的附加说明:代码很大,包含许多不同的逻辑.总体描述如下:我有一个SocketAsyncEventArgs用于接受新连接.它很棒.现在,对于每个新接受的连接,我创建一个新的SocketAsyncEventArgs来接收数据.我把这个(为接收创建的SocketAsyncEventArgs)放在一个池中.它不会被重用,但它的UserToken被用于跟踪连接; 例如那些断开连接的连接或那些7分钟内没有发送任何数据的连接将被关闭和处理(SocketAsyncEventArgs的AcceptSocket将被关闭(两者),关闭和处理,SocketAsyncEventArgs对象本身也是如此).这是一个执行这些任务的Sudo类,但所有其他逻辑和日志记录以及错误检查和其他任何内容都被删除,以使其简单明了(也许更容易发现有问题的代码):

class Sudo
{
    Socket _listener;
    int _port = 8797;

    public Sudo()
    {
        var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
        _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        _listener.Bind(ipEndPoint);

        _listener.Listen(100);

        Accept(null);
    }

    void Accept(SocketAsyncEventArgs acceptEventArg)
    {
        if (acceptEventArg == null)
        {
            acceptEventArg = new SocketAsyncEventArgs();
            acceptEventArg.Completed += AcceptCompleted;
        }
        else acceptEventArg.AcceptSocket = null;

        bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;

        if (!willRaiseEvent) Accepted(acceptEventArg);
    }

    void AcceptCompleted(object sender, SocketAsyncEventArgs e)
    {
        Accepted(e);
    }

    void Accepted(SocketAsyncEventArgs e)
    {
        var acceptSocket = e.AcceptSocket;
        var readEventArgs = CreateArg(acceptSocket);

        var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);

        Accept(e);

        if (!willRaiseEvent) Received(readEventArgs);
    }

    SocketAsyncEventArgs CreateArg(Socket acceptSocket)
    {
        var arg = new SocketAsyncEventArgs();
        arg.Completed += IOCompleted;

        var buffer = new byte[64 * 1024];
        arg.SetBuffer(buffer, 0, buffer.Length);

        arg.AcceptSocket = acceptSocket;

        arg.SocketFlags = SocketFlags.None;

        return arg;
    }

    void IOCompleted(object sender, SocketAsyncEventArgs e)
    {
        switch (e.LastOperation)
        {
            case SocketAsyncOperation.Receive:
                Received(e);
                break;
            default: break;
        }
    }

    void Received(SocketAsyncEventArgs e)
    {
        if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
        {
            // Kill(e);
            return;
        }

        var bytesList = new List<byte>();
        for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);

        var bytes = bytesList.ToArray();

        Process(bytes);

        ReceiveRest(e);

        Perf.IncOp();
    }

    void ReceiveRest(SocketAsyncEventArgs e)
    {
        e.SocketFlags = SocketFlags.None;
        for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
        e.SetBuffer(0, e.Buffer.Length);

        var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
        if (!willRaiseEvent) Received(e);
    }

    void Process(byte[] bytes) { }
}
Run Code Online (Sandbox Code Playgroud)

Cor*_*son 5

之所以变慢,是因为这些线程中的每个线程都需要进行上下文切换,这是一个相对昂贵的操作。添加的线程越多,仅用于上下文切换而不用于实际代码的CPU所占的比例就越高。

您已经以一种非常奇怪的方式解决了每个客户端一个线程的瓶颈。服务器端异步的全部要点是减少线程数-每个客户端不具有一个线程,但理想情况下,系统中的每个逻辑处理器只有一个或两个。

您发布的异步代码看起来不错,所以我只能猜测您的Process方法很容易忽略非异步,从而阻止了其中的I / O,即数据库或文件访问。当I / O阻塞时,.NET线程池会检测到此情况并自动启动一个新线程-基本上,在这里,随着I / O Process成为瓶颈,它逐渐失去了控制。

异步管道实际上需要100%异步才能从中获得任何重大收益。Half-in将让您编写复杂的代码,其性能与简单的同步代码一样差。

如果您绝对不能完全使该Process方法异步,则可能有些伪造它。让事情在队列中等待由有限的小型线程池进行处理。