TPL 数据流的 AsyncLocal 值不正确

Aza*_*zat 5 .net c# tpl-dataflow .net-core

考虑这个例子:

class Program

{
    private static readonly ITargetBlock<string> Mesh = CreateMesh();
    private static readonly AsyncLocal<string> AsyncLocalContext
        = new AsyncLocal<string>();

    static async Task Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 4)
            .Select(ProcessMessage);
        await Task.WhenAll(tasks);

        Mesh.Complete();
        await Mesh.Completion;

        Console.WriteLine();
        Console.WriteLine("Done");
    }

    private static async Task ProcessMessage(int number)
    {
        var param = number.ToString();
        using (SetScopedAsyncLocal(param))
        {
            Console.WriteLine($"Before send {param}");
            await Mesh.SendAsync(param);
            Console.WriteLine($"After send {param}");
        }
    }

    private static IDisposable SetScopedAsyncLocal(string value)
    {
        AsyncLocalContext.Value = value;

        return new Disposer(() => AsyncLocalContext.Value = null);
    }

    private static ITargetBlock<string> CreateMesh()
    {
        var blockOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = DataflowBlockOptions.Unbounded,
            EnsureOrdered = false,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        };

        var block1 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block2 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block3 = new ActionBlock<string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");
        }, blockOptions);

        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

        block1.LinkTo(block2, linkOptions);
        block2.LinkTo(block3, linkOptions);

        return new EncapsulatedActionBlock<string>(block1, block3.Completion);
    }
}

internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
    private readonly ITargetBlock<T> _wrapped;

    public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
    {
        _wrapped = wrapped;
        Completion = completion;
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
        _wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);

    public void Complete() => _wrapped.Complete();

    public void Fault(Exception exception) => _wrapped.Fault(exception);

    public Task Completion { get; }
}

internal class Disposer : IDisposable
{
    private readonly Action _disposeAction;

    public Disposer(Action disposeAction)
    {
        _disposeAction = disposeAction
            ?? throw new ArgumentNullException(nameof(disposeAction));
    }

    public void Dispose()
    {
        _disposeAction();
    }
}

Run Code Online (Sandbox Code Playgroud)

执行的结果将类似于:

发送前 1
发送1后
发送前 2
发送2后
发送前 3
发送3后
发送前 4
发送4后
   Block1 [线程 9] 输入:3 - 上下文:3。
   Block1 [线程 10] 输入:2 - 上下文:1。
   Block1 [线程 8] 输入:4 - 上下文:4。
   Block1 [线程 11] 输入:1 - 上下文:2。
   Block2 [线程 9] 输入:2 - 上下文:3。
   Block2 [线程 7] 输入:1 - 上下文:2。
   Block2 [线程 10] 输入:3 - 上下文:3。
   Block2 [线程 8] 输入:4 - 上下文:4。
   Block3 [线程 11] 输入:4 - 上下文:4。
   Block3 [线程 7] 输入:1 - 上下文:2。
   Block3 [线程 9] 输入:3 - 上下文:3。
   Block3 [线程 4] 输入:2 - 上下文:3。

完毕

正如您所看到的,在移动到第二个 TDF 块后,传递的上下文值和存储的上下文值并不总是相同。此行为会破坏多个日志框架的 LogContext 功能使用。

  1. 这是预期的行为吗(请解释原因)?
  2. TPL 数据流是否以某种方式弄乱了执行上下文?

The*_*ias 3

要了解发生了什么,您必须了解 Dataflow 块的工作原理。它们内部没有阻塞的线程等待消息到达。处理由工作任务完成。让我们考虑 的简单(默认)情况MaxDegreeOfParallelism = 1。最初,工人任务为零。当使用 异步发布消息时SendAsync,发布该消息的同一任务将成为辅助任务并开始处理该消息。如果在处理第一条消息时发布另一条消息,则会发生其他情况。第二条消息将排入块的输入队列中,并且发布它的任务将完成。第二条消息将由处理第一条消息的工作任务处理。只要队列中有消息,初始工作任务就会挑选它们并一一处理它们。如果在某个时刻没有更多缓冲消息,则工作任务将完成,并且块将返回其初始状态(零工作任务)。接下来SendAsync将成为新的工作任务,依此类推。对于MaxDegreeOfParallelism = 1,在任何给定时刻只能存在一个工作任务。

让我们用一个例子来演示这一点。下面是一个ActionBlock以延迟 X 提供的消息,并以延迟 Y 处理每条消息。

private static void ActionBlockTest(int sendDelay, int processDelay)
{
    Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
    var asyncLocal = new AsyncLocal<int>();
    var actionBlock = new ActionBlock<int>(async i =>
    {
        await Task.Delay(processDelay);
        Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
    });
    Task.Run(async () =>
    {
        foreach (var i in Enumerable.Range(1, 5))
        {
            asyncLocal.Value = i;
            await actionBlock.SendAsync(i);
            await Task.Delay(sendDelay);
        }
    }).Wait();
    actionBlock.Complete();
    actionBlock.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)

让我们看看如果我们快速发送消息并缓慢处理它们会发生什么:

ActionBlockTest(100, 200); // .NET Core 3.0
Run Code Online (Sandbox Code Playgroud)

SendDelay:100,ProcessDelay:200
已处理 1,上下文:1
已处理 2,上下文:1
已处理 3,上下文:1
已处理 4,上下文:1
已处理 5,上下文:1

上下文AsyncLocal保持不变,因为同一个工作任务处理了所有消息。

现在让我们缓慢发送消息并快速处理它们:

ActionBlockTest(200, 100); // .NET Core 3.0
Run Code Online (Sandbox Code Playgroud)

SendDelay:200,ProcessDelay:100
已处理 1,上下文:1
已处理 2,上下文:2
已处理 3,上下文:3
已处理 4,上下文:4
已处理 5,上下文:5

每条消息的上下文AsyncLocal都不同,因为每条消息都是由不同的工作任务处理的。

这个故事的道德教训是,每个人都SendAsync不能保证创建一个遵循消息的单个异步工作流程,直到其旅程结束、管道结束。因此该类AsyncLocal不能用于保存每条消息的环境数据。