C#Parallel.foreach - 使变量线程安全

dal*_*mac 2 c# locking task-parallel-library parallel.foreach

我一直在重写一些流程密集型循环来使用TPL来提高速度.这是我第一次尝试线程,所以想要检查我正在做的是正确的方法.

结果很好 - DataTable从标准foreach循环转换为Parallel.ForEach循环时,处理1000行中的数据已将处理时间从34分钟减少到9分钟.对于此测试,我删除了非线程安全操作,例如将数据写入日志文件并递增计数器.

我仍然需要写回日志文件并增加一个计数器,所以我尝试实现一个包含streamwriter/increment代码块的锁.

FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
StreamWriter streamwriter = new StreamWriter(filestream);
streamwriter.AutoFlush = true;

try
{
    object locker = new object();

    // Lets assume we have a DataTable containing 1000 rows of data.
    DataTable datatable_results;

    if (datatable_results.Rows.Count > 0)
    {
        int row_counter = 0;

        Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            lock (locker)
            {
                row_counter++;
                streamwriter.WriteLine("Processing row: {0}", row_counter);

                // Write any data we want to log.
            }
        });                    
    }
}
catch (Exception e)
{
    // Catch the exception.
}
streamwriter.Close();
Run Code Online (Sandbox Code Playgroud)

以上似乎按预期工作,性能成本最低(仍然是9分钟的执行时间).当然,锁中包含的操作本身并不重要 - 我假设随着锁中处理代码所花费的时间增加,线程锁定的时间越长,它对处理时间的影响就越大.

我的问题:以上是一种有效的方法,或者是否有一种不同的方式来实现上述更快或更安全?

另外,假设我们的原始DataTable实际包含30000行.有没有什么可以通过将其DataTable分成每行1000行的块然后处理它们Parallel.ForEach而不是一次处理所有300000行来获得?

Sri*_*vel 5

写入文件是昂贵的,你在写入文件时持有一个独占锁,这很糟糕.它会引入争论.

您可以将其添加到缓冲区中,然后一次性写入文件.这应该消除争用并提供扩展方式.

if (datatable_results.Rows.Count > 0)
{
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
    {
        // Process data_row as normal.

        // When ready to write to log, do so.

        buffer.Enqueue(string.Format( "Processing row: {0}", index));
    });

    streamwriter.AutoFlush = false;
    string line;
    while (buffer.TryDequeue(out line))
    {
        streamwriter.WriteLine(line);
    }
    streamwriter.Flush();//Flush once when needed
}
Run Code Online (Sandbox Code Playgroud)
  1. 请注意,您不需要维护循环计数器,只需 Parallel.ForEach提供一个.区别在于它不是计数器而是索引.如果我改变了预期的行为,您仍然可以添加计数器并使用Interlocked.Increment它来递增它.
  2. 我看到你正在使用streamwriter.AutoFlush = true,这会损害性能,你可以设置它false并在你写完所有数据后冲洗它.

如果可能的话,包装StreamWriterin using语句,这样你甚至不需要刷新流(你可以免费获得).

或者,您可以查看完成其工作的日志框架.示例:NLog,Log4net等