如何在TPL数据流中执行异步操作以获得最佳性能?

use*_*612 8 c# csv task-parallel-library blockingcollection tpl-dataflow

我编写了以下方法来批处理一个巨大的CSV文件.我们的想法是从文件中读取一大块行到内存中,然后将这些行分成固定大小的批量.获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间.

private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
    List<string> chunk = new List<string>(chunkSize);

    foreach (var line in File.ReadLines(filePath))
    {
        if (chunk.Count == chunk.Capacity)
        {
            // Partition each chunk into smaller chunks grouped on column 1
            var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);

            // Further breakdown the chunks into batch size groups
            var groups = partitions.Select(x => x.Select((i, index) =>
                new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

            // Get batches from groups
            var batches = groups.SelectMany(x => x)
                .Select(y => y.Select(z => z)).ToList();

            // Process all batches asynchronously
            batches.AsParallel().ForAll(async b =>
            {
                WebClient client = new WebClient();
                byte[] bytes = System.Text.Encoding.ASCII
                    .GetBytes(b.SelectMany(x => x).ToString());
                await client.UploadDataTaskAsync("myserver.com", bytes);
            });

            // clear the chunk
            chunk.Clear();
        }

        chunk.Add(line);
    }
}
Run Code Online (Sandbox Code Playgroud)

这段代码似乎不是非常有效的bcoz有两个原因.

  1. 从CSV文件读取的主线程被阻止,直到处理完所有分区.

  2. AsParallel阻止所有任务完成.因此,如果线程池中有更多线程可用于工作,我就不会使用它们因为没有任何分区绑定任务.

batchSize是固定的,因此无法更改,但chunkSize可以调整性能.我可以选择足够大的chunkSize,这样没有创建的批处理系统中没有可用的线程,但它仍然意味着Parallel.ForEach方法会阻塞,直到完成所有任务.

如何更改代码,以便系统中的所有可用线程都可以用来执行无空闲的工作.我想我可以使用BlockingCollection来存储批次,但不确定要给它的容量大小,因为每个块中没有批处理是动态的.

关于如何使用TPL来最大化线程利用率的任何想法,以便系统上的大多数可用线程总是在做什么?

更新: 这是我到目前为止使用TPL数据流.它是否正确?

private static void UploadData(string filePath, int chunkSize, int batchSize)
{
    var buffer1 = new BatchBlock<string>(chunkSize);
    var buffer2 = new BufferBlock<IEnumerable<string>>();

    var action1 = new ActionBlock<string[]>(t =>
    {
        Console.WriteLine("Got a chunk of lines " + t.Count());

        // Partition each chunk into smaller chunks grouped on column 1
        var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);

        // Further breakdown the chunks into batch size groups
        var groups = partitions.Select(x => x.Select((i, index) =>
            new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

        // Get batches from groups
        var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

        foreach (var batch in batches)
        {
            buffer2.Post(batch);
        }

    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    buffer1.LinkTo(action1, new DataflowLinkOptions
        { PropagateCompletion = true });

    var action2 = new TransformBlock<IEnumerable<string>,
        IEnumerable<string>>(async b =>
    {
        await ExecuteBatch(b);
        return b;

    }, new ExecutionDataflowBlockOptions
        { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    buffer2.LinkTo(action2, new DataflowLinkOptions
        { PropagateCompletion = true });

    var action3 = new ActionBlock<IEnumerable<string>>(b =>
    {
        Console.WriteLine("Finised executing a batch");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    action2.LinkTo(action3, new DataflowLinkOptions
        { PropagateCompletion = true });

    Task produceTask = Task.Factory.StartNew(() =>
    {
        foreach (var line in File.ReadLines(filePath))
        {
            buffer1.Post(line);
        }

        //Once marked complete your entire data flow will signal a stop for
        // all new items
        Console.WriteLine("Finished producing");
        buffer1.Complete();
    });

    Task.WaitAll(produceTask);
    Console.WriteLine("Produced complete");

    action1.Completion.Wait();//Will add all the items to buffer2
    Console.WriteLine("Action1 complete");

    buffer2.Complete();//will not get any new items
    action2.Completion.Wait();//Process the batch of 5 and then complete

    Task.Wait(action3.Completion);

    Console.WriteLine("Process complete");
    Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)

Jul*_*bot 3

您已经很接近了,在 TPL 中,数据从一个块流向另一个块,您应该尝试保持这一范式。例如,action1 应该是 TransformManyBlock,因为 anActionBlock是 an ITargetBlock(即终止块)。

当您在链接上指定传播完成时,完成事件将自动通过块路由,因此您只需对最后一个块执行一次 wait()。

将其视为多米诺骨牌链,您在第一个块上调用complete,它将通过链传播到最后一个块。

您还应该考虑多线程的内容和原因;你的例子严重依赖I/O,我不认为绑定一堆线程来等待I/O完成是正确的解决方案。

最后,一定要注意什么阻碍了或没有阻碍。在您的示例中buffer1.Post(...)不是阻塞调用,您没有理由在任务中使用它。

我编写了以下使用 TPL DataFlow 的示例代码:

static void Main(string[] args)
{
    var filePath = "C:\\test.csv";
    var chunkSize = 1024;
    var batchSize = 128;

    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var uploadData = new ActionBlock<IEnumerable<string>>(
        async (data) =>
        {
            WebClient client = new WebClient();
            var payload = data.SelectMany(x => x).ToArray();
            byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
            //await client.UploadDataTaskAsync("myserver.com", bytes);
            await Task.Delay(2000);
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });

    var lineBuffer = new BatchBlock<string>(chunkSize);

    var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
        (data) =>
        {
            // Partition each chunk into smaller chunks grouped on column 1
            var partitions = data.GroupBy(c => c.Split(',')[0]);

            // Further beakdown the chunks into batch size groups
            var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

            // Get batches from groups
            var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

            // Don't forget to enumerate before returning
            return batches.ToList();
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    lineBuffer.LinkTo(splitData, linkCompletion);
    splitData.LinkTo(uploadData, linkCompletion);

    foreach (var line in File.ReadLines(filePath))
    {
        lineBuffer.Post(line);
    }
    lineBuffer.Complete();

    // Wait for uploads to finish
    uploadData.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)