Ank*_*jay 12 c# async-await dapper iasyncenumerable
我们最近将使用的 ASP.NET Core API 迁移Dapper到 .NET Core 3.1。迁移后,我们觉得有机会为我们的一个端点使用最新IAsyncEnumerable功能C# 8。
这是更改前的伪代码:
public async Task<IEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
foreach (var item in items)
{
yield return item;
}
}
}
Run Code Online (Sandbox Code Playgroud)
之后IAsyncEnumerable代码更改:
// Import Nuget pacakage: System.Linq.Async
public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
await foreach (var item in items.ToAsyncEnumerable())
{
yield return item;
}
}
}
Run Code Online (Sandbox Code Playgroud)
上面的方法是ToAsyncEnumerable从这篇文章中得到了松散的启发,但我不能 100% 确定我是否在正确的地方/上下文中使用它。
题:
IEnumerable但我们可以用ToAsyncEnumerable它来将它转换IAsyncEnumerable为async stream上面的for吗?注意:这个问题看起来类似于如果与 async/await 一起使用,返回 IEnumerable 会发生什么(使用 Dapper 从 SQL Server 流式传输数据)?但我认为这不能回答我的问题。
Dav*_*ave 18
更新:当我第一次写这个答案时,我并不知道异步迭代器。感谢 Theodor Zoulias 指出这一点。鉴于此,可以采用一种更简单的方法:
using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();
// Consider using reader.NextResultAsync(). Follow github issue for details:
while (await reader.ReadAsync()) {
yield return rowParser(reader);
}
Run Code Online (Sandbox Code Playgroud)
参考: https: //github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322
原答案:
这是我编写的一个IAsyncEnumerable包装器,可以帮助那些想要使用 async/await 流式传输无缓冲数据并且还想要 Dapper 类型映射的强大功能的人:
public class ReaderParser<T> : IAsyncEnumerable<T> {
public ReaderParser(SqlDataReader reader) {
Reader = reader;
}
private SqlDataReader Reader { get; }
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
return new ReaderParserEnumerator<T>(Reader);
}
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
public ReaderParserEnumerator(SqlDataReader reader) {
Reader = reader;
RowParser = reader.GetRowParser<T>();
}
public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
private SqlDataReader Reader { get; }
private Func<IDataReader, T> RowParser { get; }
public async ValueTask DisposeAsync() {
await Reader.DisposeAsync();
}
public async ValueTask<bool> MoveNextAsync() {
return await Reader.ReadAsync();
}
}
Run Code Online (Sandbox Code Playgroud)
用法:
var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);
Run Code Online (Sandbox Code Playgroud)
然后,包System.Linq.Async基本上添加了您知道和喜欢的所有不错的IEnumerable扩展,例如在我的用法中:
var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
Run Code Online (Sandbox Code Playgroud)
自2.0.138 版本起,Dapper 中就可以直接使用此功能。使用其QueryUnbufferedAsync DbConnection扩展名或GridReader.ReadUnbufferedAsync批量方法。
对于以前版本的 Dapper:
戴夫的答案给出了所需的一切,但对我来说还不够简单。
因此,从它的详细阐述来看,这是一个获取IAsyncEnumerablewith Dapper 的扩展方法:
/// <summary>
/// Asynchronously enumerates the results of a query.
/// </summary>
/// <typeparam name="T">The type of result to return.</typeparam>
/// <param name="cnn">The connection to query on.</param>
/// <param name="sql">The SQL to execute for the query.</param>
/// <param name="param">The parameters to pass, if any.</param>
/// <param name="transaction">The transaction to use, if any.</param>
/// <returns>An asynchronous enumerator of the results.</returns>
/// <remarks>See <see href="https://stackoverflow.com/a/66723553/1178314"/> and
/// <see href="https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322"/>.</remarks>
public static async IAsyncEnumerable<T> EnumerateAsync<T>(this DbConnection cnn, string sql, object param = null, IDbTransaction transaction = null)
{
await using var reader = await cnn.ExecuteReaderAsync(sql, param, transaction).ConfigureAwait(false);
var rowParser = reader.GetRowParser<T>();
while (await reader.ReadAsync().ConfigureAwait(false))
{
yield return rowParser(reader);
}
while (await reader.NextResultAsync().ConfigureAwait(false)) { }
}
Run Code Online (Sandbox Code Playgroud)
请注意,您必须使用 a DbConnection,而不是IDbConnection。否则reader.ReadAsync将无法使用。
然后在您的代码中您可以执行以下操作:
var asyncEnumerable = connection.EnumerateAsync<YourDto>("select ...", yourParameters);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2952 次 |
| 最近记录: |