use*_*330 3 c# multithreading task-parallel-library .net-4.5
我需要通过缓冲区将数据缓冲区写入来自不同线程的文件.为了避免锁定,我写入不同的文件,比如'file_1','file_2',最后将所有这些文件合并到'file'.这种做法好吗?还有更好的建议吗?
有些文件非常庞大,包含数千个缓冲区.因此,创建了数千个临时文件,然后进行合并和清理.
我的直觉是按摩文件会很昂贵,管理数千个文件听起来很复杂且容易出错.
相反,有一个专门的线程写作.其他线程只是将其消息添加到等待写入的队列中.虽然会有一些同步开销,但锁中的实际工作量非常小,只需将消息的"指针"复制到队列即可.打开文件并写入它们可能比使用互斥锁更昂贵,实际上可能会提高性能.
这是一个示例方法(没有错误处理!),显示如何使用a BlockingCollection来管理缓冲区队列以写入文件.
我们的想法是创建一个ParallelFileWriter然后在想要写入文件的所有线程中使用它.完成后,只需将其丢弃(但请确保在所有线程完成写入之前不要将其丢弃!).
这只是一个让你入门的简单例子 - 你需要添加参数检查和错误处理:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public sealed class ParallelFileWriter: IDisposable
{
// maxQueueSize is the maximum number of buffers you want in the queue at once.
// If this value is reached, any threads calling Write() will block until there's
// room in the queue.
public ParallelFileWriter(string filename, int maxQueueSize)
{
_stream = new FileStream(filename, FileMode.Create);
_queue = new BlockingCollection<byte[]>(maxQueueSize);
_writerTask = Task.Run(() => writerTask());
}
public void Write(byte[] data)
{
_queue.Add(data);
}
public void Dispose()
{
_queue.CompleteAdding();
_writerTask.Wait();
_stream.Close();
}
private void writerTask()
{
foreach (var data in _queue.GetConsumingEnumerable())
{
Debug.WriteLine("Queue size = {0}", _queue.Count);
_stream.Write(data, 0, data.Length);
}
}
private readonly Task _writerTask;
private readonly BlockingCollection<byte[]> _queue;
private readonly FileStream _stream;
}
class Program
{
private void run()
{
// For demo purposes, cancel after a couple of seconds.
using (var fileWriter = new ParallelFileWriter(@"C:\TEST\TEST.DATA", 100))
using (var cancellationSource = new CancellationTokenSource(2000))
{
const int NUM_THREADS = 8;
Action[] actions = new Action[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i)
{
int id = i;
actions[i] = () => writer(cancellationSource.Token, fileWriter, id);
}
Parallel.Invoke(actions);
}
}
private void writer(CancellationToken cancellation, ParallelFileWriter fileWriter, int id)
{
int index = 0;
while (!cancellation.IsCancellationRequested)
{
string text = string.Format("{0}:{1}\n", id, index++);
byte[] data = Encoding.UTF8.GetBytes(text);
fileWriter.Write(data);
}
}
static void Main(string[] args)
{
new Program().run();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2418 次 |
| 最近记录: |