使用BlockingCollection <T>()扩展连接

Fin*_*une 3 c# parallel-processing performance networking multithreading

我有一台通过TCP LAN与50个或更多设备进行通信的服务器。每个套接字读取消息循环都有一个Task.Run。

我将每条消息的到达缓冲到一个阻塞队列中,在每个阻塞队列中都有一个使用BlockingCollection.Take()的Task.Run。

所以像(半伪代码):

套接字读取任务

Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

消息缓冲区任务

Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});
Run Code Online (Sandbox Code Playgroud)

因此,这将使50多个读取任务和50多个任务在自己的缓冲区中阻塞。

我这样做是为了避免阻塞阅读循环,并允许程序更公平地分配对消息的处理时间,所以我相信。

这是一种低效的处理方式吗?有什么更好的方法?

Mar*_*ell 5

您可能对“ channels”工作感兴趣,尤其是:System.Threading.Channels。这样做的目的是提供异步的生产者/消费者队列,涵盖单个和多个生产者和消费者方案,上限等。通过使用异步API,您不必等待很多事情就占用大量线程。

您的读取循环将变为:

while (notCancelled) {
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);
}
Run Code Online (Sandbox Code Playgroud)

和生产者:

switch (element)
{
    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...
}
Run Code Online (Sandbox Code Playgroud)

所以:最小的变化


或者-或结合使用-您可以研究“管道”(https://www.nuget.org/packages/System.IO.Pipelines/)之类的东西,因为您要处理TCP数据,所以这是理想的选择合适,这是我在Stack Overflow(用于处理大量连接)上为自定义Web套接字服务器查看的内容。由于API在整个过程中都是异步的,因此在平衡工作方面做得很好-管道API在设计时考虑了典型的TCP场景,例如,在检测帧边界时部分消耗了传入的数据流。我已经写了很多有关这种用法的文章,其中的代码示例主要。请注意,“管道”不包括直接的TCP层,但是“红est”服务器包括一个,https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/确实(披露:我写了)。