Joe*_*gen 4 c# android task-parallel-library android-asynctask
我正致力于在C#(.NET 4.0)中使用TPL.
我创建了一个自定义API来简化Web请求的创建和下载内容(异步,使用延续任务).那部分工作正常.
当我尝试使用延迟任务创建LimitedConcurrencyLevelTaskScheduler(在并行编程的示例和任务的MSDN文档中找到)时,我遇到了问题.如果您不熟悉该类,那么它所做的就是限制计划任意数量的任务的并发度.
基本上我想将Web请求任务链的创建推迟到由我们调度的任务中,LimitedConcurrencyLevelTaskScheduler以便我可以限制并发下载的数量.
正如圣人Stephen Toub所建议的那样,当推迟创建a时Task,最好的办法是设计你的API以返回a Func<Task>或Func<Task<TResult>>.我做到了这一点.
不幸的是,我的程序在安排第一组并发任务后挂起.假设我的任务仅限于4度并发.在这种情况下,将启动4个任务,然后程序将挂起.任务永远不会完成.
我创建了一个简单的例子来简单地说明问题.我正在使用文件读取而不是使用WebRequest.我将并发度限制为1.
class Program
{
static Func<Task> GetReadTask()
{
return () =>
{
Console.WriteLine("Opening file.");
FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);
byte[] buffer = new byte[32];
Console.WriteLine("Beginning read.");
return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
};
}
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(ts);
int[] range = {1, 2, 3};
var tasks = range.Select(number =>
{
Func<Task> getTask = GetReadTask();
return factory.StartNew(() =>
{
var task = getTask();
task.Wait();
});
});
Task.WaitAll(tasks.ToArray());
}
}
Run Code Online (Sandbox Code Playgroud)
为了澄清"挂起"我的意思,这就是输出的样子.
Opening file.
Beginning read.
Run Code Online (Sandbox Code Playgroud)
然后没有别的东西被打印出来......永远.
有关正在发生的事情的任何线索?
好问题!
首先,我不确定这LimitedConcurrencyLevelTaskScheduler是学术上正确的解决方案.为了限制N的并发请求数,您必须阻止N个任务,这些任务首先会破坏使用APM异步调用的目的.
话虽如此,实施起来比替代方案容易得多.您需要有一个工作队列并记录飞行请求的数量,然后根据需要创建工作任务.要做到这一点并非易事,如果并发请求的数量N很小,那么N个被阻塞的线程就不是世界末日.
因此,代码的问题在于,在其他任务中创建的任务使用父任务中的调度程序.实际上,对于FromAsync使用底层APM实现创建的任务不是这样,因此有点不同.
您使用以下方法创建任务Main:
return factory.StartNew( () =>
{
var task = getTask();
task.Wait();
}
);
Run Code Online (Sandbox Code Playgroud)
factory使用LimitedConcurrencyLevelTaskScheduler( 1 ),所以这些任务中只有一个可以同时执行,而一个正在等待返回的任务getTask().
所以,在GetReadTask你打电话Task<int>.Factory.FromAsync.这样运行是因为FromAsync不尊重父任务的调度程序.
然后你创建一个延续.ContinueWith(task => fileStream.Close()).这会创建一个尊重其父级调度程序的任务.由于LimitedConcurrencyLevelTaskScheduler已经执行了一个任务(Main被阻止的任务),因此延迟无法运行并且您有死锁.
解决方案是在正常的线程池线程上运行continuation TaskScheduler.Default.然后它运行并且死锁被破坏.
这是我的解决方案:
static Task QueueReadTask( TaskScheduler ts, int number )
{
Output.Write( "QueueReadTask( " + number + " )" );
return Task.Factory.StartNew( () =>
{
Output.Write( "Opening file " + number + "." );
FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
byte[] buffer = new byte[ 32 ];
var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
var tClose = tRead.ContinueWith( task =>
{
Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
fileStream.Close();
}
, TaskScheduler.Default
);
tClose.Wait();
}
, CancellationToken.None
, TaskCreationOptions.None
, ts
);
}
Run Code Online (Sandbox Code Playgroud)
而Main现在看起来是这样的:
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
int[] range = { 1, 2, 3 };
var tasks = range.Select( number =>
{
var task = QueueReadTask( ts, number );
return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
}
)
.ToArray();
Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
Task.WaitAll( tasks );
Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}
Run Code Online (Sandbox Code Playgroud)
有几点需要注意:
移动task.Wait()进入QueueReadTask使您更明显地阻止任务.您可以删除FromAsync呼叫和延续,并将其替换为正常的同步呼叫,因为您无论如何都要阻止.
返回的任务QueueReadTask可以有延续.默认情况下,它们在默认调度程序下运行,因为它们继承父任务的调度程序而不是先行调度程序.在这种情况下,没有父任务,因此使用默认调度程序.
| 归档时间: |
|
| 查看次数: |
2604 次 |
| 最近记录: |