Mar*_*ell 80 .net c# task task-parallel-library async-await
我有一些库(套接字网络)代码,它Task基于提供基于API的待处理请求响应TaskCompletionSource<T>.然而,TPL中的一个烦恼是,似乎不可能阻止同步延续.我会希望能够做的是两种:
TaskCompletionSource<T>不应该允许呼叫者附加TaskContinuationOptions.ExecuteSynchronously,或SetResult/ TrySetResult)以指定TaskContinuationOptions.ExecuteSynchronously应该忽略的方式具体来说,我遇到的问题是传入的数据正在由专用的阅读器处理,如果调用者可以附加,TaskContinuationOptions.ExecuteSynchronously他们可以阻止阅读器(这不仅影响它们).以前,我通过一些hackery解决了这个问题,它检测是否存在任何延续,如果它们将完成推送到ThreadPool,那么如果调用者已经使工作队列饱和,则会产生重大影响,因为完成将不会被处理及时.如果他们使用Task.Wait()(或类似),他们将基本上陷入僵局.同样,这就是读者使用专用线程而不是使用工作者的原因.
所以; 在我尝试唠叨TPL团队之前:我错过了一个选项吗?
关键点:
ThreadPool作为一个实现,因为它需要在池饱和时工作以下示例生成输出(排序可能因时间而异):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
Run Code Online (Sandbox Code Playgroud)
问题在于随机调用者设法在"主线程"上获得延续.在实际代码中,这将打断主要读者; 坏事!
码:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
Eli*_*bel 48
.NET 4.6中的新功能:
.NET 4.6包含一个新的TaskCreationOptions:RunContinuationsAsynchronously.
既然你愿意使用Reflection访问私有字段......
您可以使用标记标记TCS的任务TASK_STATE_THREAD_WAS_ABORTED,这将导致所有延续不被内联.
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Run Code Online (Sandbox Code Playgroud)
编辑:
我建议您使用表达式,而不是使用Reflection emit.这更具可读性,并且具有与PCL兼容的优点:
var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
Expression.Lambda<Action<Task>>(
Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Run Code Online (Sandbox Code Playgroud)
不使用反射:
如果有人感兴趣的话,我已经找到了一种没有反射的方法来做到这一点,但它也有点"肮脏",当然还有一个不可忽视的性能惩罚:
try
{
Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
source.TrySetResult(123);
Thread.ResetAbort();
}
Run Code Online (Sandbox Code Playgroud)
我不认为TPL中有任何内容可以提供对延续的显式 API控制TaskCompletionSource.SetResult.我决定保留我最初的答案来控制async/await场景的这种行为.
这是另一个强加异步的解决方案ContinueWith,如果tcs.SetResult-triggered延续发生在SetResult被调用的同一个线程上:
public static class TaskExt
{
static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
new ConcurrentDictionary<Task, Thread>();
// SetResultAsync
static public void SetResultAsync<TResult>(
this TaskCompletionSource<TResult> @this,
TResult result)
{
s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
try
{
@this.SetResult(result);
}
finally
{
Thread thread;
s_tcsTasks.TryRemove(@this.Task, out thread);
}
}
// ContinueWithAsync, TODO: more overrides
static public Task ContinueWithAsync<TResult>(
this Task<TResult> @this,
Action<Task<TResult>> action,
TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
{
return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
{
Thread thread = null;
s_tcsTasks.TryGetValue(t, out thread);
if (Thread.CurrentThread == thread)
{
// same thread which called SetResultAsync, avoid potential deadlocks
// using thread pool
return Task.Run(() => action(t));
// not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
// return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
}
else
{
// continue on the same thread
var task = new Task(() => action(t));
task.RunSynchronously();
return Task.FromResult(task);
}
}), continuationOptions).Unwrap();
}
}
Run Code Online (Sandbox Code Playgroud)
更新以解决评论:
我不控制调用者 - 我无法让他们使用特定的continue-with变体:如果可以的话,问题就不会存在于第一位
我不知道你不控制来电者.但是,如果你不控制它,你可能也没有将TaskCompletionSource对象直接传递给调用者.从逻辑上讲,你将传递它的令牌部分,即tcs.Task.在这种情况下,通过向上面添加另一个扩展方法,解决方案可能更容易:
// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
{
Thread thread = null;
s_tcsTasks.TryGetValue(antecedent, out thread);
if (Thread.CurrentThread == thread)
{
// continue on a pool thread
return antecedent.ContinueWith(t => t,
TaskContinuationOptions.None).Unwrap();
}
else
{
return antecedent;
}
}), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)
使用:
// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ...
// client code
task.ContinueWith(delegate
{
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
// ...
// library code
source.SetResultAsync(123);
Run Code Online (Sandbox Code Playgroud)
这实际上适用于await和ContinueWith(小提琴)并且没有反射黑客.
| 归档时间: |
|
| 查看次数: |
7931 次 |
| 最近记录: |