CorrelationManager.LogicalOperationStack是否与Parallel.For,Tasks,Threads等兼容

wag*_*ghe 10 .net c# system.diagnostics parallel-extensions task-parallel-library

有关背景信息,请参阅此问题:

任务并行库中的任务如何影响ActivityID?

该问题询问Tasks如何影响Trace.CorrelationManager.ActivityId.@Greg Samson用测试程序回答了他自己的问题,显示ActivityId在Tasks的上下文中是可靠的.测试程序在Task委托的开头设置一个ActivityId,休眠以模拟工作,然后检查最后的ActivityId以确保它是相同的值(即它没有被另一个线程修改).该程序成功运行.

在研究线程,任务和并行操作的其他"上下文"选项(最终为日志提供更好的上下文)时,我遇到了Trace.CorrelationManager.LogicalOperationStack的一个奇怪问题(无论如何我都很奇怪).我在下面的问题中复制了我的"答案".

我认为它充分描述了我遇到的问题(Trace.CorrelationManager.LogicalOperationStack显然已经损坏 - 或者什么 - 当在Parallel.For的上下文中使用时,但只有当Parallel.For本身包含在逻辑操作中时) .

这是我的问题:

  1. Trace.CorrelationManager.LogicalOperationStack应该可以与Parallel.For一起使用吗?如果是这样,如果一个逻辑操作已经与Parallel.For启动有效,它是否会有所不同?

  2. 是否有一种"正确"的方式使用LogicalOperationStack与Parallel.For?我能不同地对这个示例程序进行编码以使其"有效"吗?通过"工作",我的意思是LogicalOperationStack总是具有预期的条目数,并且条目本身是预期的条目.

我已经使用Threads和ThreadPool线程做了一些额外的测试,但是我必须返回并重试这些测试,看看我是否遇到了类似的问题.

我会说,看起来任务/并行线程和ThreadPool线程确实从父线程"继承"了Trace.CorrelationManager.ActivityId和Trace.CorrelationManager.LogicalOperationStack值.这是预期的,因为CorrelationManager使用CallContext的LogicalSetData方法(而不是SetData)存储这些值.

请再次参考此问题,以获取我在下面发布的"答案"的原始背景:

任务并行库中的任务如何影响ActivityID?

另请参阅Microsoft的Parallel Extensions论坛上的类似问题(目前尚未得到解答):

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[开始粘贴]

请原谅我发布这个作为答案,因为它不是你的问题的真正答案,但是,它与你的问题有关,因为它处理CorrelationManager行为和线程/任务/等.我一直在寻找使用CorrelationManager LogicalOperationStack(和StartLogicalOperation/StopLogicalOperation方法)在多线程场景中提供额外的上下文.

我拿了你的例子并稍微修改它以增加使用Parallel.For并行执行工作的能力.另外,我用StartLogicalOperation/StopLogicalOperation括号(内部)DoLongRunningWork.从概念上讲,DoLongRunningWork每次执行时都会执行以下操作:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

我发现如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(始终是堆栈上预期的操作数,并且堆栈上的操作值始终为预期).

在我自己的一些测试中,我发现并非总是这样.逻辑操作堆栈正在"损坏".我能想到的最好的解释是,当"子"线程退出时,将CallContext信息"合并"回"父"线程上下文导致"旧"子线程上下文信息(逻辑操作)为"继承"由另一个兄弟姐妹线程.

问题也可能与Parallel.For显然使用主线程(至少在示例代码中,如编写)作为"工作线程"之一(或者在并行域中应该调用它们)之间的事实有关.每当执行DoLongRunningWork时,就会启动一个新的逻辑操作(在开始时)并停止(在结束时)(也就是说,将其推送到LogicalOperationStack并从中弹出).如果主线程已经有效的逻辑操作,并且DoLongRunningWork在主线程上执行,则启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作.DoLongRunningWork的任何后续执行(只要DoLongRunningWork的这个"迭代"在主线程上执行)将(显然)继承主线程的LogicalOperationStack(现在它有两个操作,而不仅仅是一个预期的操作).

我花了很长时间才弄清楚为什么LogicalOperationStack的行为在我的示例中与我的示例的修改版本不同.最后我看到在我的代码中我将整个程序放在逻辑操作中,而在我的测试程序的修改版本中,我没有.这意味着在我的测试程序中,每次执行"工作"(类似于DoLongRunningWork)时,已经存在逻辑操作.在我的测试程序的修改版本中,我没有在逻辑操作中将整个程序括起来.

所以,当我修改你的测试程序以在逻辑操作中包含整个程序时如果我使用Parallel.For,我遇到了完全相同的问题.

使用上面的概念模型,这将成功运行:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

虽然这最终会因为LogicalOperationStack显然不同步而断言:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

这是我的示例程序.它类似于你的,因为它有一个DoLongRunningWork方法来操作ActivityId以及LogicalOperationStack.我也有两种踢DoLongRunningWork的方式.一种风味使用任务一使用Parallel.For.还可以执行每种风格,使得整个并行操作被包含在逻辑操作中或不包含在逻辑操作中.因此,总共有4种方法来执行并行操作.要尝试每个,只需取消注释所需的"使用..."方法,重新编译并运行. UseTasks,UseTasks(true)并且UseParallelFor应该全部运行完成. UseParallelFor(true)因为LogicalOperationStack没有预期的条目数,所以会在某些时候断言.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}
Run Code Online (Sandbox Code Playgroud)

如果LogicalOperationStack可以与Parallel.For(和/或其他线程/任务构造)一起使用或者如何使用它的整个问题可能有其自身的问题.也许我会发一个问题.在此期间,我想知道你是否对此有任何想法(或者,我想知道你是否考虑过使用LogicalOperationStack,因为ActivityId似乎是安全的).

[结束糊]

有没有人对这个问题有任何想法?

wag*_*ghe 5

[开始更新]

我还在微软的.Net支持论坛的Parallel Extensions上提出了这个问题,并最终收到了Stephen Toub回答.事实证明,LogicalCallContext中存在导致LogicalOperationStack损坏的错误.还有一个很好的描述(在Stephen的回复中,我对他的回答做了回复),简要介绍了Parallel.For如何处理任务以及为何使Parallel.For容易受到bug的影响.

在我的回答中,我推测LogicalOperationStack与Parallel.For不兼容,因为Parallel.For使用主线程作为"工作"线程之一.根据斯蒂芬的解释,我的推测是错误的.Parallel.For确实使用主线程作为"工作"线程之一,但它不是简单地"按原样"使用.第一个Task在主线程上运行,但运行方式就好像它在新线程上运行一样.阅读Stephen的描述以获取更多信息.

[结束更新]

据我所知,答案如下:

ActivityId和LogicalOperationStack都通过CallContext.LogicalSetData存储.这意味着这些值将"流向"任何"子"线程.这很酷,例如,可以在入口点将ActivityId设置为多线程服务器(比如服务调用),并且最终从该入口点启动的所有线程都可以是同一"活动"的一部分.类似地,逻辑操作(通过LogicalOperationStack)也流向子线程.

关于Trace.CorrelationManager.ActivityId:

ActivityId似乎与我测试过的所有线程模型兼容:直接使用线程,使用ThreadPool,使用Tasks,使用Parallel.*.在所有情况下,ActivityId都具有预期值.

关于Trace.CorrelationManager.LogicalOperationStack:

LogicalOperationStack似乎与大多数线程模型兼容,但不兼容Parallel.*.直接使用线程,ThreadPool和Tasks,LogicalOperationStack(在我的问题中提供的示例代码中操作)保持其完整性.在任何时候LogicalOperationStack的内容都是预期的.

LogicalOperationStack与Parallel.For不兼容.如果逻辑操作"有效",即在调用Parallel.*操作之前调用CorrelationManager.StartLogicalOperation,然后在Paralle.*(即代理中)的上下文中启动新的逻辑操作. ,然后LogicalOperationStack将被破坏.(我应该说它可能会被破坏.并行.*可能不会创建任何额外的线程,这意味着LogicalOperationStack将是安全的).

问题源于Parallel.*使用主线程(或者更确切地说,启动并行操作的线程)作为其"工作"线程之一的事实.这意味着当"逻辑操作"在与"主"线程相同的"工作"线程中启动和停止时,正在修改"主"线程的LogicalOperationStack.即使调用代码(即委托)正确维护堆栈(确保每个StartLogicalOperation都使用相应的StopLogicalOperation"停止"),也会修改"主"线程堆栈.最终似乎(对我而言),"主"线程的LogicalOperationStack基本上被两个不同的"逻辑"线程修改:"主"线程和"工作"线程,它们都恰好是相同的线.

我不知道究竟为什么这不起作用的深层细节(至少我希望它有效).我最好的猜测是每次委托在一个线程上执行(与主线程不同),线程"继承"主线程的LogicalOperationStack的当前状态.如果委托当前正在主线程上执行(被重用为工作线程),并且已经开始逻辑操作,那么其他并行化委托中的一个(或多个)将"继承"现在具有的主线程的LogicalOperationStack一个(或多个)新逻辑操作生效!

FWIW,我实现了(主要用于测试,我目前还没有使用它),下面的"逻辑堆栈"来模仿LogicalOperationStack,但这样做可以使它与Parallel一起使用.*随意尝试它出来和/或使用它.要测试,请将呼叫替换为

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation
Run Code Online (Sandbox Code Playgroud)

在我的原始问题的示例代码中调用

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}
Run Code Online (Sandbox Code Playgroud)

这是受到Brent VanderMeide描述的范围对象的启发:http://www.dnrtv.com/default.aspx?showNum = 114

您可以像这样使用此类:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}
Run Code Online (Sandbox Code Playgroud)