IAsyncEnumerable 和数据库查询

T. *_*nik 8 c# asp.net .net-core asp.net-core iasyncenumerable

我有三个返回IAsyncEnumerable的控制器方法WeatherForecast。第一个 #1 使用SqlConnection读者并产生结果async。第二个#2 使用具有使用扩展能力的 EF Core AsAsyncEnumerable。第三个 #3 使用 EF Core 和ToListAsync方法。

我认为 #1 和 #2 的缺点是,例如,如果我在 while 或 for every 内部执行一些耗时的操作,那么数据库连接将一直打开到最后。在场景 #3 中,我可以使用关闭的连接迭代列表并执行其他操作。

但是,我不知道IAsyncEnumerable对于数据库查询是否有意义。是否存在内存和性能问题?如果我用于IAsyncEnumerable从 API 返回 HTTP 请求,那么一旦返回响应,它就不在内存中,我可以返回下一个响应,依此类推。但是数据库呢?如果我选择所有行(使用IAsyncEnumerableToListAsync),整个表在哪里?

也许这不是 StackOverflow 的问题,我在这里遗漏了一些重要的东西。

#1

[HttpGet("db", Name = "GetWeatherForecastAsyncEnumerableDatabase")]
public async IAsyncEnumerable<WeatherForecast> GetAsyncEnumerableDatabase()
{
    var connectionString = "";
    await using var connection = new SqlConnection(connectionString);

    string sql = "SELECT * FROM [dbo].[Table]";
    await using SqlCommand command = new SqlCommand(sql, connection);

    connection.Open();
    await using var dataReader = await command.ExecuteReaderAsync();
    while (await dataReader.ReadAsync())
    {
        yield return new WeatherForecast
        {
            Date = Convert.ToDateTime(dataReader["Date"]),
            Summary = Convert.ToString(dataReader["Summary"]),
            TemperatureC = Convert.ToInt32(dataReader["TemperatureC"])
        };
    }

    await connection.CloseAsync();
}
Run Code Online (Sandbox Code Playgroud)

#2

[HttpGet("ef", Name = "GetWeatherForecastAsyncEnumerableEf")]
public async IAsyncEnumerable<WeatherForecast> GetAsyncEnumerableEf()
{
    await using var dbContext = _dbContextFactory.CreateDbContext();
    await foreach (var item in dbContext
        .Tables
        .AsNoTracking()
        .AsAsyncEnumerable())
    {
        yield return new WeatherForecast
        {
            Date = item.Date,
            Summary = item.Summary,
            TemperatureC = item.TemperatureC
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

#3

[HttpGet("eflist", Name = "GetWeatherForecastAsyncEnumerableEfList")]
public async Task<IEnumerable<WeatherForecast>> GetAsyncEnumerableEfList()
{
    await using var dbContext = _dbContextFactory.CreateDbContext();
    var result =  await dbContext
        .Tables
        .AsNoTracking()
        .Select(item => new WeatherForecast
        {
            Date = item.Date,
            Summary = item.Summary,
            TemperatureC = item.TemperatureC
        })
        .ToListAsync();

    return result;
}
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 5

服务器端

如果我只关心服务器,我会选择 .NET 6 中的选项 4:

  • 使用注入的 DbContext,编写 LINQ 查询并返回结果,而AsAsyncEnumerable()不是ToListAsync()
public class WeatherForecastsController:ControllerBase
{
    WeatherDbContext _dbContext;
    public WeatherForecastsController(WeatherDbContext dbContext)
    {
        _dbContext=dbContext;
    }

    public async IAsyncEnumerable<WeatherForecast> GetAsync()
    {
        return _dbContext.Forecasts.AsNoTracking()
                         .Select(item => new WeatherForecast
                         {
                             Date = item.Date,
                             Summary = item.Summary,
                             TemperatureC = item.TemperatureC
                         })
                         .AsAsyncEnumerable();
    }
}
Run Code Online (Sandbox Code Playgroud)

为每个请求创建一个新的 Controller 实例,这意味着只要处理请求,DbContext 就会一直存在。

[FromServices]属性可用于直接将 DbContext 注入到操作方法中。行为并没有真正改变,DbContext 的范围仍然是请求:

public async IAsyncEnumerable<WeatherForecast> GetAsync([FromServices] WeatherContext dbContext)
{
    ...
}
Run Code Online (Sandbox Code Playgroud)

ASP.NET Core 将发出一个 JSON 数组,但至少元素一旦可用就会立即发送给调用者。

客户端

客户在反序列化之前仍然必须接收整个 JSON 数组。

在 .NET 6 中处理此问题的一种方法是使用DeserializeAsyncEnumerable来解析响应流并在项目到来时发出它们:

using var stream=await client.GetAsStreamAsync(...);
var forecasts= JsonSerializer.DeserializeAsyncEnumerable(stream, new JsonSerializerOptions
        {
            DefaultBufferSize = 128
        });
await foreach(var forecast in forecasts)
{
...
}
Run Code Online (Sandbox Code Playgroud)

默认缓冲区大小为 16KB,因此如果我们想尽快接收对象,则需要较小的缓冲区大小。

但这是一个特定于解析器的解决方案。

使用流式 JSON 响应

此问题的常见解决方法是使用流式 JSON(又称为每行 JSON、换行符分隔 JSON 又称为 JSON-NL 或其他)。所有名称都指同一件事 - 发送由换行符分隔的无缩进 JSON 对象流。这是一种古老的技术,许多人试图劫持并呈现为自己的技术

{ "Date": "2022-10-18", Summary = "Blah", "TemperatureC"=18.5 }
{ "Date": "2022-10-18", Summary = "Blah", "TemperatureC"=18.5 }
{ "Date": "2022-10-18", Summary = "Blah", "TemperatureC"=18.5 }
Run Code Online (Sandbox Code Playgroud)

这不是有效的 JSON,但许多解析器可以处理它。即使解析器不能,我们也可以简单地一次读取一行文本并解析它。

使用不同的协议

即使流式 JSON 响应也是一种解决方法。HTTP 首先不允许服务器端流式传输。即使客户端只读取前 3 项,服务器也必须发送所有数据,因为无法取消响应。

使用允许流式传输的协议会更有效。ASP.NET Core 6 提供了两个选项:

在这两种情况下,服务器都会在对象可用时立即将对象发送到客户端。客户端可以根据需要取消流。

在 SignalR 集线器中,代码可以返回一个IAsyncEnumerable或一个 Channel:

public class AsyncEnumerableHub : Hub
{
    ...
    public async IAsyncEnumerable<WeatherForecast> GetForecasts()
    {
        return _dbContext.Forecasts.AsNoTracking()
                         .Select(item => new WeatherForecast
                         {
                             Date = item.Date,
                             Summary = item.Summary,
                             TemperatureC = item.TemperatureC
                         })
                         .AsAsyncEnumerable();
    }
}
Run Code Online (Sandbox Code Playgroud)

在 gRPC 中,服务器方法将对象写入响应流:

public override async Task StreamingFromServer(ForecastRequest request,
    IServerStreamWriter<ForecastResponse> responseStream, ServerCallContext context)
{
    ...
    await foreach (var item in queryResults)
    {
        if (context.CancellationToken.IsCancellationRequested)
        {
            return;
        }
        await responseStream.WriteAsync(new ForecastResponse{Forecast=item});
    }
}
Run Code Online (Sandbox Code Playgroud)