是否可以和/或建议在一个对象中使用多个 System.Threading.Channels?

pwe*_*ber 1 .net c# multithreading system.threading.channels

我正在开发一个 .net core 3.0 web 应用程序,并决定在单例服务中使用 System.Threading.Channels。我的范围请求服务的顶层注入这个单例来访问它的通道。

我决定使用这种模式将请求(为其他连接的客户端生成实时更新)与这些更新的执行分离。

在一个对象中实现一个通道有很多例子。

谁能告诉我是否可以/建议在我的单身人士中使用多个频道?

我还没有遇到创建多个通道并在创建单例时“启动”它们的任何问题。我只是还没有达到可以测试多个客户端请求点击单例上不同通道以查看它是否运行良好的地步。(或者根本没有?...)

我使用多个通道的主要动机是我希望单例根据通道中项目的类型做不同的事情。

public class MyChannelSingleton 
{
    public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
    public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();

    public MyChannelSingleton() 
    {
        StartChannels();
    }

    private void StartChannels() 
    {
        // discarded async tasks due to calling in ctor
        _ = StartTypeOneChannel();
        _ = StartTypeTwoChannel();
    }

    private async Task StartTypeOneChannel()
    {
        var reader = TypeOneChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyType item))
            {
                // item is sucessfully read from channel
            }
        }
    }

    private async Task StartTypeTwoChannel()
    {
        var reader = TypeTwoChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyOtherType item))
            {
                // item is sucessfully read from channel
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我也希望永远不会“完成”通道,并在应用程序的生命周期内让它们可用。

Pan*_*vos 7

只要您正确使用它们,您就可以使用任意数量的工具。事实上,使用公开处理管道的后台服务(本质上是一个单例)是在 .NET Core 中使用它们的一种非常常见的方法。

通道不仅仅是异步队列。它们类似于 DataFlow 块——它们可用于创建处理管道,每个块/worker 处理来自输入缓冲区/ChannelReader 的数据并将结果转发到输出缓冲区/ChannelWriter。DataFlow 块通过任务本身处理异步处理。对于通道,我们需要自己处理工人任务。

我们需要记住的一个非常重要的概念是通道不是直接访问的。事实上,在几乎所有情况下,它们甚至不应该作为字段或属性公开。在大多数情况下,只需要一个 ChannelReader。在某些情况下,例如在管道的头部,可能会暴露ChannelWriter 。或不。

个体工人/步骤

典型的工作步骤如下所示

private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyOtherType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach(var item from reader.ReadAllAsync(token))
        {
             MyType2 result=........;
             await writer.WriteAsync(result);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}
Run Code Online (Sandbox Code Playgroud)

一些注意事项:

  • 您可以创建多个任务,如果您希望并使用它Task.WhenAll来等待所有工作人员在关闭通道之前完成。
  • 如果管道不够快,您可以使用有通道来防止大量消息累积。
  • 如果发出取消信号,则从输入通道读取工作任务都将被取消。
  • 当工作任务完成时,无论是因为被取消还是被抛出,都会关闭通道。
  • 当“头”通道完成时,完成将从一个步骤流到下一个步骤。

组合步骤

通过将一个输出阅读器传递给另一个输入阅读器,可以组合多个步骤,例如:

var cts=new CancelaltionTokenSource();

var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;
Run Code Online (Sandbox Code Playgroud)

CancellationTokenSource 可用于提前结束管道或设置超时以防止管道挂起。

管道头

“头部”读取器可能来自“适配器”方法,例如:

private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    foreach(var item from input)
    {
        if (token.IsCancellationRequested)
        {
            break;
        }
        writer.TryWrite(result);
    }
    //No-one else is going to complete this channel
    channel.Complete();
    return channel.Reader;    
}
Run Code Online (Sandbox Code Playgroud)

在后台服务的情况下,我们可以使用服务方法将输入“发布”到头部通道,例如:

class MyService
{
    Channel<MyType0> _headChannel;

    public MyService()
    {
        _headChannel=Channel.CreateBounded<MyType0>(5);
    }

    public async Task ExecuteAsync(CancellationToken token)
    {
        var step1=Step1(_headChannel.Reader,token);
        var step2=Step2(step1,token);        
        await step2.Completion;
    }

    public Task PostAsync(MyType0 input)
    {
        return _headChannel.Writer.WriteAsync(input);
    }

    public Stop()
    {
        _headChannel.Writer.TryComplete();
    }

...

}
Run Code Online (Sandbox Code Playgroud)

我故意使用看起来像BackgroundService 方法名称的方法名称。StartAsync 或 ExecuteAsync 可用于设置管道。StopAsync 可用于表示其完成,例如当最终用户点击Ctrl+ 时C

排队的 BackgroundService示例中显示的另一个有用的技术是注册一个接口,客户端可以使用该接口发布消息而不是直接访问服务类,例如:

interface IQueuedService<T>
{
    Task PostAsync(T input);
}

Run Code Online (Sandbox Code Playgroud)

结合 System.Linq.Async

ReadAllAsync()方法返回一个IAsyncEnumerable<T>,这意味着我们可以在System.Linq.Async 中使用运算符,例如 Where 或 Take 来过滤、批处理或转换消息,例如:

private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        var inpStream=reader.ReadAllAsync(token)
                            .Where(it=>it.IsActive);
        await foreach(var item from inpStream)
        {
             await writer.WriteAsync(item);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}
Run Code Online (Sandbox Code Playgroud)