对于 TPL 数据流:如何获取 TransformBlock 生成的所有输出,同时阻塞直到所有输入都已处理完毕?

Nov*_*Eng 1 c# task-parallel-library tpl-dataflow

我向单个数据库同步提交一系列select语句(查询 - 数千个),并DataTable为每个查询返回一个语句(注意:该程序仅在运行时了解其正在扫描的数据库模式,因此指某东西的用途DataTables)。该程序在客户端计算机上运行并连接到远程计算机上的数据库。运行这么多查询需要很长时间。因此,假设异步或并行执行它们会加快速度,我正在探索TPL Dataflow (TDF)。我想使用该TDF库,因为它似乎可以处理与编写多线程代码相关的所有问题,否则需要手动完成。

显示的代码基于http://blog.i3arnon.com/2016/05/23/tpl-dataflow/。它很小,只是帮助我理解 的基本操作TDF。请知道我已经阅读了很多博客并编写了很多迭代代码来尝试解决这个问题。

尽管如此,在当前的迭代中,我有一个问题:

问题

代码位于一个button click方法内部(用户使用 UI 选择一台机器、一个 SQL 实例和一个数据库,然后开始扫描)。带有运算符的两行在await构建时返回错误:The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'。我无法更改按钮单击方法的返回类型。我是否需要以某种方式将button click方法与async-await代码隔离?

问题

尽管我找到了描述 的基础知识的漂亮文章TDF,但我找不到如何获取每次调用所产生的输出TransformBlock(即 a DataTable)的示例。虽然我想提交查询async,但我确实需要阻塞,直到提交的所有查询都TransformBlock完成。在所有查询完成之前,如何获取and 块DataTable生成的一系列 s ?TransformBlock

注意:我承认我现在只有一个区块。至少,我将添加一个取消块,因此需要/想要使用 TPL。

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{

    UserInput userInput = new UserInput
    {
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    };

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 2,
        });

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    {
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    }

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;
}

public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
    try
    {

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    }
    catch
    {
        throw;
    }

}
Run Code Online (Sandbox Code Playgroud)

The*_*ias 5

检索 a 输出的最简洁方法TransformBlock是使用方法OutputAvailableAsync和执行嵌套循环TryReceive。它有点冗长,因此您可以考虑将此功能封装在扩展方法中ToListAsync

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    List<T> list = new();
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            list.Add(item);
        }
    }
    Debug.Assert(source.Completion.IsCompleted);
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}
Run Code Online (Sandbox Code Playgroud)

然后你可以使用ToListAsync这样的方法:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    TransformBlock<string, DataTable> transformBlock = new(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with each dataTable
    }
}
Run Code Online (Sandbox Code Playgroud)

注意:ToListAsync实现是破坏性的,这意味着如果发生错误,所使用的消息将被丢弃。为了使其无损,只需删除该await source.Completion线即可。在这种情况下,在处理带有已消费消息的列表后,您必须记住awaitCompletion的 ,否则您将不知道是否TransformBlock无法处理其所有输入。

确实存在检索数据流块的输出的替代方法,例如dcastro 的这种方法使用 aBufferBlock作为缓冲区,并且性能稍高一些,但我个人认为上述方法更安全、更直接。

您还可以以流式方式将其作为序列检索,而不是在检索输出之前等待块完成IAsyncEnumerable<T>

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> source,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            yield return item;
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    Debug.Assert(source.Completion.IsCompleted);
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
Run Code Online (Sandbox Code Playgroud)

DataTable这样,您就可以在烹饪完成后立即拿到每个食物,而不必等待所有查询的处理。要使用IAsyncEnumerable<T>您只需将 移至await之前foreach

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with each dataTable
}
Run Code Online (Sandbox Code Playgroud)

高级:下面是该方法的更复杂版本,它以与和ToListAsync等方法传播相同的直接方式传播底层块的所有错误。原始的简单方法使用答案中所示的技术将错误包装在嵌套中。Task.WhenAllParallel.ForEachAsyncToListAsyncAggregateExceptionWait

/// <summary>
/// Asynchronously waits for the successful completion of the specified source, and
/// returns all the received messages. In case the source completes with error,
/// the error is propagated and the received messages are discarded.
/// </summary>
public static Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);

    async Task<List<T>> Implementation()
    {
        List<T> list = new();
        while (await source.OutputAvailableAsync(cancellationToken)
            .ConfigureAwait(false))
            while (source.TryReceive(out T item))
                list.Add(item);
        await source.Completion.ConfigureAwait(false);
        return list;
    }

    return Implementation().ContinueWith(t =>
    {
        if (t.IsCanceled) return t;
        Debug.Assert(source.Completion.IsCompleted);
        if (source.Completion.IsFaulted)
        {
            TaskCompletionSource<List<T>> tcs = new();
            tcs.SetException(source.Completion.Exception.InnerExceptions);
            return tcs.Task;
        }
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)

.NET 6 更新:DataflowBlock.ReceiveAllAsync .NET 6 中引入了新的 API ,具有以下签名:

public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
    this IReceivableSourceBlock<TOutput> source,
    CancellationToken cancellationToken = default);
Run Code Online (Sandbox Code Playgroud)

与前述方法类似ToAsyncEnumerable。重要的区别在于,新 API在传播其所有消息后,不会传播消费块可能出现的异常。此行为与Channels 库中的source类似 API 不一致。ReadAllAsync我已在 GitHub 上报告了这种一致性,该问题目前被 Microsoft 标记为bug