use*_*612 8 c# csv task-parallel-library blockingcollection tpl-dataflow
我编写了以下方法来批处理一个巨大的CSV文件.我们的想法是从文件中读取一大块行到内存中,然后将这些行分成固定大小的批量.获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间.
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x)
.Select(y => y.Select(z => z)).ToList();
// Process all batches asynchronously
batches.AsParallel().ForAll(async b =>
{
WebClient client = new WebClient();
byte[] bytes = System.Text.Encoding.ASCII
.GetBytes(b.SelectMany(x => x).ToString());
await client.UploadDataTaskAsync("myserver.com", bytes);
});
// clear the chunk
chunk.Clear();
}
chunk.Add(line);
}
}
Run Code Online (Sandbox Code Playgroud)
这段代码似乎不是非常有效的bcoz有两个原因.
从CSV文件读取的主线程被阻止,直到处理完所有分区.
AsParallel阻止所有任务完成.因此,如果线程池中有更多线程可用于工作,我就不会使用它们因为没有任何分区绑定任务.
batchSize是固定的,因此无法更改,但chunkSize可以调整性能.我可以选择足够大的chunkSize,这样没有创建的批处理系统中没有可用的线程,但它仍然意味着Parallel.ForEach方法会阻塞,直到完成所有任务.
如何更改代码,以便系统中的所有可用线程都可以用来执行无空闲的工作.我想我可以使用BlockingCollection来存储批次,但不确定要给它的容量大小,因为每个块中没有批处理是动态的.
关于如何使用TPL来最大化线程利用率的任何想法,以便系统上的大多数可用线程总是在做什么?
更新: 这是我到目前为止使用TPL数据流.它是否正确?
private static void UploadData(string filePath, int chunkSize, int batchSize)
{
var buffer1 = new BatchBlock<string>(chunkSize);
var buffer2 = new BufferBlock<IEnumerable<string>>();
var action1 = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk of lines " + t.Count());
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach (var batch in batches)
{
buffer2.Post(batch);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions
{ PropagateCompletion = true });
var action2 = new TransformBlock<IEnumerable<string>,
IEnumerable<string>>(async b =>
{
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
buffer2.LinkTo(action2, new DataflowLinkOptions
{ PropagateCompletion = true });
var action3 = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing a batch");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
action2.LinkTo(action3, new DataflowLinkOptions
{ PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
buffer1.Post(line);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
Console.WriteLine("Finished producing");
buffer1.Complete();
});
Task.WaitAll(produceTask);
Console.WriteLine("Produced complete");
action1.Completion.Wait();//Will add all the items to buffer2
Console.WriteLine("Action1 complete");
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Task.Wait(action3.Completion);
Console.WriteLine("Process complete");
Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)
您已经很接近了,在 TPL 中,数据从一个块流向另一个块,您应该尝试保持这一范式。例如,action1 应该是 TransformManyBlock,因为 anActionBlock
是 an ITargetBlock
(即终止块)。
当您在链接上指定传播完成时,完成事件将自动通过块路由,因此您只需对最后一个块执行一次 wait()。
将其视为多米诺骨牌链,您在第一个块上调用complete,它将通过链传播到最后一个块。
您还应该考虑多线程的内容和原因;你的例子严重依赖I/O,我不认为绑定一堆线程来等待I/O完成是正确的解决方案。
最后,一定要注意什么阻碍了或没有阻碍。在您的示例中buffer1.Post(...)
不是阻塞调用,您没有理由在任务中使用它。
我编写了以下使用 TPL DataFlow 的示例代码:
static void Main(string[] args)
{
var filePath = "C:\\test.csv";
var chunkSize = 1024;
var batchSize = 128;
var linkCompletion = new DataflowLinkOptions
{
PropagateCompletion = true
};
var uploadData = new ActionBlock<IEnumerable<string>>(
async (data) =>
{
WebClient client = new WebClient();
var payload = data.SelectMany(x => x).ToArray();
byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
//await client.UploadDataTaskAsync("myserver.com", bytes);
await Task.Delay(2000);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });
var lineBuffer = new BatchBlock<string>(chunkSize);
var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
(data) =>
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = data.GroupBy(c => c.Split(',')[0]);
// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
// Don't forget to enumerate before returning
return batches.ToList();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
lineBuffer.LinkTo(splitData, linkCompletion);
splitData.LinkTo(uploadData, linkCompletion);
foreach (var line in File.ReadLines(filePath))
{
lineBuffer.Post(line);
}
lineBuffer.Complete();
// Wait for uploads to finish
uploadData.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)