什么是ThreadPool服务器的异步/等效等价物?

cap*_*aig 20 c# tcplistener threadpool async-await

我正在使用同步apis和线程池的tcp服务器上看起来像这样:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}
Run Code Online (Sandbox Code Playgroud)

假设我的目标是在资源使用率最低的情况下处理尽可能多的并发连接,这似乎很快就会受到可用线程数量的限制.我怀疑通过使用非阻塞任务apis,我将能够用更少的资源处理更多.

我最初的印象是这样的:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}
Run Code Online (Sandbox Code Playgroud)

但令我印象深刻的是,这可能会导致瓶颈.也许HandleConnectionAsync需要花费非常长的时间才能达到第一次等待,并且将阻止主接受循环继续进行.这只会使用一个线程,还是运行时会在多个线程上神奇地运行它看起来合适的东西?

有没有办法结合这两种方法,以便我的服务器将使用它所需的线程数量来确定正在运行的任务的数量,但是它不会在IO操作上不必要地阻塞线程?

在这样的情况下,是否存在最大化吞吐量的惯用方法?

nos*_*tio 23

我让Framework管理线程并且不会创建任何额外的线程,除非我可能需要进行性能分析测试.特别是,如果内部调用HandleConnectionAsync主要是IO绑定的.

无论如何,如果你想在开始时释放调用线程(调度程序)HandleConnectionAsync,那么这是一个非常简单的解决方案.您可以从一个新的线程跳ThreadPoolawait Yield().如果您的服务器在初始线程(控制台应用程序,WCF服务)上没有安装任何同步上下文的执行环境中运行,那么这是有效的,这通常是TCP服务器的情况.

[已编辑]以下说明了这一点(代码最初来自此处).注意,主while循环不会显式创建任何线程:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                await task;
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        await new Program().StartListener();
    }
}
Run Code Online (Sandbox Code Playgroud)

或者,代码可能如下所示,没有await Task.Yield().注意,我传递一个asynclambdaTask.Run,因为我仍然希望从里面的异步API中受益HandleConnectionAsyncawait在那里使用:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

[更新]基于注释:如果这将是库代码,则执行环境确实是未知的,并且可能具有非默认同步上下文.在这种情况下,我宁愿在池线程上运行主服务器循环(没有任何同步上下文):

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            if (task.IsFaulted)
                await task;
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

这样,内部创建的所有子任务StartListener都不会受到客户端代码的同步上下文的影响.所以,我不必Task.ConfigureAwait(false)明确地在任何地方打电话. 

  • @captncraig,文档意味着不要在**UI线程上使用它进行冗长的处理**(比如使用`await Task.Yeild()`进行紧密循环).那是因为syn.UI线程的上下文使用`PostMessage`来实现这个深层内部,它可以接管其他用户输入消息,如鼠标和键盘,并阻止UI.所有这些都不适用于无上下文的环境,其中`Task.Yield()`只使用`ThreadPool.QueueUserWorkItem`,是一个方便的等待快捷方式.更多信息:http://stackoverflow.com/q/20319769/1768303 (2认同)

usr*_*usr 8

现有的答案已正确提出使用Task.Run(() => HandleConnection(client));,但没有解释原因.

原因如下:您担心,HandleConnectionAsync可能需要一些时间才能达到第一个等待.如果您坚持使用异步IO(在本例中应该如此),这意味着HandleConnectionAsync在没有任何阻塞的情况下进行CPU绑定工作.这是线程池的完美案例.它可以运行简短,无阻塞的CPU工作.

而你是对的,接受循环将HandleConnectionAsync在返回之前花费很长时间来限制(可能因为其中有大量的CPU绑定工作).如果您需要高频率的新连接,则应避免这种情况.

如果您确定没有重要的工作限制循环,您可以保存额外的线程池Task而不是这样做.

或者,您可以同时运行多个接受.替换await Serve();为(例如):

var serverTasks =
    Enumerable.Range(0, Environment.ProcessorCount)
    .Select(_ => Serve());
await Task.WhenAll(serverTasks);
Run Code Online (Sandbox Code Playgroud)

这消除了可伸缩性问题.注意,这await将吞下除一个错误之外的所有错误.


归档时间:

查看次数:

13787 次

最近记录:

6 年,2 月 前