线程安全StreamWriter C#怎么做?2

8 c# thread-safety writer

所以这是我上一个问题的延续 - 所以问题是"构建一个线程安全的程序的最佳方法是什么,它需要将双值写入文件.如果通过streamwriter保存值的函数是多线程调用的?最好的方法是什么?"

我修改了一些在MSDN上找到的代码,以下怎么样?这个正确地将所有内容写入文件.

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 15

Thread并且QueueUserWorkItem用于线程化最低可用API.除非我绝对没有其他选择,否则我不会使用它们.尝试使用Task更高级别的抽象类.有关详细信息,请参阅我最近关于该主题的博客文章.

您还可以将其BlockingCollection<double>用作正确的生产者/消费者队列,而不是尝试使用最低的可用API进行同步构建.

正确地重新发明这些轮子是非常困难的.我强烈建议使用为此类需求Task而设计的类(并且BlockingCollection具体而言).它们内置于.NET 4.0框架中,可作为.NET 3.5的附加组件使用.


Jam*_*ing 6

  • 代码将writer作为实例var但使用静态锁定器.如果您有多个实例写入不同的文件,他们没有理由需要共享相同的锁
  • 在相关的说明中,由于您已经拥有了编写器(作为私有实例var),您可以使用它来锁定而不是在这种情况下使用单独的锁定器对象 - 这使事情变得更简单.

"正确答案"实际上取决于您在锁定/阻止行为方面所寻求的内容.例如,最简单的方法是跳过中间数据结构,只需要一个WriteValues方法,这样每个线程"报告"其结果,然后将它们写入文件.就像是:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}
Run Code Online (Sandbox Code Playgroud)

当然,这意味着工作线程在其"报告结果"阶段进行序列化 - 取决于性能特征,虽然可能很好(例如,生成5分钟,写入500毫秒).

另一方面,您将工作线程写入数据结构.如果您使用的是.NET 4,我建议您使用ConcurrentQueue而不是自己锁定.

此外,您可能希望以比工作线程报告的更大批量执行文件i/o,因此您可能选择仅在某个频率上在后台线程中写入.频谱的那一端看起来像下面的那样(你将删除实际代码中的Console.WriteLine调用,那些就是那里你可以看到它在运行中)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)