如果将IEnumerable与async / await一起使用(从带有Dapper的SQL Server中流式传输数据)一起使用,会发生什么情况?

Ily*_*dik 9 c# sql-server async-await dapper c#-8.0

我正在使用Dapper从SQL Server中的一个非常大的数据集中流式传输数据。返回IEnumerable和调用可以正常工作Query(),但是当我切换到时QueryAsync(),该程序似乎尝试从SQL Server而非流式读取所有数据。

根据这个问题,它应该可以正常工作buffered: false,我正在做,但是这个问题什么也没说async/await

现在根据这个问题,要完成我想做的事情并不容易QueryAsync()

我是否正确理解在切换上下文时会枚举可枚举的对象async/await

另一个问题是,当新的C#8异步流可用时,是否可以做到这一点?

Pan*_*vos 9

如果检查源代码,您会发现您的怀疑几乎是正确的。当buffered为false时,QueryAsync同步流。

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}
Run Code Online (Sandbox Code Playgroud)

正如评论所解释的,ReadAsync当期望返回类型是IEnumerable时,将无法使用。这就是为什么必须引入C#8的异步枚举的原因。

ExecuteReaderSync的代码是:

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}
Run Code Online (Sandbox Code Playgroud)

它使用Read代替ReadAsync

C#8异步流将允许对其进行重写以返回IAsyncEnumerable。仅仅更改语言版本并不能解决问题。

鉴于异步流上的当前文档,这看起来像:

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}
Run Code Online (Sandbox Code Playgroud)

Buuuuuut异步流是只能在.NET Core上运行的功能之一,并且可能尚未实现。当我尝试在Sharplab.io中写一个时,是Kaboom。[connection lost, reconnecting…]

  • @IlyaChernomordik如果您被“缓冲”,那么它将“异步”填充缓冲区,并且仅在“满”时将其返回(异步)给您;当您枚举它时,同步是可以的,因为它已经全部在本地缓冲了。问题场景是非缓冲的;当*非缓冲*变得多毛-那么它是异步的直到获取数据,然后它切换为同步*同时仍从DB中读取数据* (2认同)

Mar*_*ell 8

特别是在dapper的上下文中,是:它需要一个不同的API,如@Panagiotis的出色回答所解释。随之而来的并不是这样的答案,而是面临相同挑战的实施者可能希望考虑的其他环境。

我还没有为精巧的人“撒谎”(尽管我 SE.Redis的),而且我在各种选择之间感到困惑:

  1. 仅为 .NET Core添加新的API ,并返回适当的异步枚举类型
  2. 彻底粉碎现有API作为重大更改(“重大”等),将其更改为返回异步可枚举类型

我们可能会使用“ 1”,但是我不得不说,第二种选择非常诱人,这有充分的理由:

  • 现有的API可能无法满足人们的期望
  • 我们希望新代码开始使用它

但是奇怪的是.NET Core 3.0的本质IAsyncEnumerable<T>-显然Dapper不仅针对.NET Core 3.0;我们可以:

  1. 将功能限制为.NET Core 3.0,然后返回 IAsyncEnumerable<T>
  2. 限制为.NET Core 3.0,然后返回IAsyncEnumerable<T>
  3. 对先前的框架依赖System.Linq.Async(这不是“官方的”,但对于我们的目的来说是正式的),然后返回 IAsyncEnumerable<T>
  4. 返回一个实际上不是IAsyncEnumerable<T>(但IAsyncEnumerable<T>在可用时实现的)自定义可枚举类型,并手动实现状态机-鸭子式的性质foreach意味着只要我们的自定义可枚举类型提供正确的方法,该方法就可以正常工作

我认为我们可能会选择选项3,但重申一下:是的,有些事情需要改变。

  • @IlyaChernomordik的确是这样,默认值为“ true”(用于缓冲),因为大多数时候人们都在读取20行,以此类推,而不是200万行-如果我们默认设置为false,则会导致人们看到错误,例如:“我从方法中返回了可枚举,在连接的过程中传递了“ using”块”,“我枚举了7次,发生了不好的事情”,“它一直说我有一个开放的阅读器”。默认更容易缓冲。 (2认同)