如何防止Task上的同步延续?

Mar*_*ell 80 .net c# task task-parallel-library async-await

我有一些库(套接字网络)代码,它Task基于提供基于API的待处理请求响应TaskCompletionSource<T>.然而,TPL中的一个烦恼是,似乎不可能阻止同步延续.我会希望能够做的是两种:

  • 告诉a TaskCompletionSource<T>不应该允许呼叫者附加TaskContinuationOptions.ExecuteSynchronously,或
  • 使用池来设置结果(SetResult/ TrySetResult)以指定TaskContinuationOptions.ExecuteSynchronously应该忽略的方式

具体来说,我遇到的问题是传入的数据正在由专用的阅读器处理,如果调用者可以附加,TaskContinuationOptions.ExecuteSynchronously他们可以阻止阅读器(这不仅影响它们).以前,我通过一些hackery解决了这个问题,它检测是否存在任何延续,如果它们将完成推送到ThreadPool,那么如果调用者已经使工作队列饱和​​,则会产生重大影响,因为完成将不会被处理及时.如果他们使用Task.Wait()(或类似),他们将基本上陷入僵局.同样,这就是读者使用专用线程而不是使用工作者的原因.

所以; 在我尝试唠叨TPL团队之前:我错过了一个选项吗?

关键点:

  • 我不希望外部呼叫者能够劫持我的线程
  • 我不能使用它ThreadPool作为一个实现,因为它需要在池饱和时工作

以下示例生成输出(排序可能因时间而异):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool
Run Code Online (Sandbox Code Playgroud)

问题在于随机调用者设法在"主线程"上获得延续.在实际代码中,这将打断主要读者; 坏事!

码:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

Eli*_*bel 48

.NET 4.6中的新功能:

.NET 4.6包含一个新的TaskCreationOptions:RunContinuationsAsynchronously.


既然你愿意使用Reflection访问私有字段......

您可以使用标记标记TCS的任务TASK_STATE_THREAD_WAS_ABORTED,这将导致所有延续不被内联.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Run Code Online (Sandbox Code Playgroud)

编辑:

我建议您使用表达式,而不是使用Reflection emit.这更具可读性,并且具有与PCL兼容的优点:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Run Code Online (Sandbox Code Playgroud)

不使用反射:

如果有人感兴趣的话,我已经找到了一种没有反射的方法来做到这一点,但它也有点"肮脏",当然还有一个不可忽视的性能惩罚:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}
Run Code Online (Sandbox Code Playgroud)

  • @MarcGravell使用它为TPL团队创建一些伪样本,并通过构造函数选项或其他内容发出更改请求. (3认同)
  • @AdamHouldsworth,别担心,我已经给他们发了同样的电子邮件; p (2认同)

nos*_*tio 9

我不认为TPL中有任何内容可以提供对延续的显式 API控制TaskCompletionSource.SetResult.我决定保留我最初的答案来控制async/await场景的这种行为.

这是另一个强加异步的解决方案ContinueWith,如果tcs.SetResult-triggered延续发生在SetResult被调用的同一个线程上:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}
Run Code Online (Sandbox Code Playgroud)

更新以解决评论:

我不控制调用者 - 我无法让他们使用特定的continue-with变体:如果可以的话,问题就不会存在于第一位

我不知道你不控制来电者.但是,如果你不控制它,你可能也没有将TaskCompletionSource对象直接传递给调用者.从逻辑上讲,你将传递它的令牌部分,即tcs.Task.在这种情况下,通过向上面添加另一个扩展方法,解决方案可能更容易:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)

使用:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);
Run Code Online (Sandbox Code Playgroud)

这实际上适用于awaitContinueWith(小提琴)并且没有反射黑客.