考虑一个包含大量需要处理的作业的队列.队列限制一次只能获得1个工作,无法知道有多少工作.这些作业需要10秒才能完成,并且需要大量等待来自Web服务的响应,因此不受CPU限制.
如果我使用这样的东西
while (true)
{
var job = Queue.PopJob();
if (job == null)
break;
Task.Factory.StartNew(job.Execute);
}
Run Code Online (Sandbox Code Playgroud)
然后,它会以比完成它们更快的速度从队列中快速弹出作业,耗尽内存并堕落.> <
我不能使用(我不认为)ParallelOptions.MaxDegreeOfParallelism因为我不能使用Parallel.Invoke或Parallel.ForEach
我找到了3个替代方案
用.替换Task.Factory.StartNew
Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
task.Start();
Run Code Online (Sandbox Code Playgroud)
这似乎在某种程度上解决了这个问题,但我不清楚这是做什么的,如果这是最好的方法.
创建一个限制并发度的自定义任务调度程序
使用类似BlockingCollection的东西在启动时将作业添加到集合中,并在完成时删除以限制可以运行的编号.
#1我必须相信自己做出正确的决定,#2 /#3我必须计算出自己可以运行的最大数量的任务.
我是否理解正确 - 这是更好的方式,还是有另一种方式?
编辑 - 这是我从下面的答案,生产者 - 消费者模式中得出的结果.
除了整体吞吐量目标不是要比可以处理的更快地使作业出列并且没有多个线程轮询队列(这里没有显示但是这是非阻塞的操作并且如果从多个地方以高频率轮询将导致巨大的交易成本) .
// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = …Run Code Online (Sandbox Code Playgroud) 我有一个items(RunData.Demand)的枚举,每个都代表一些涉及通过HTTP调用API的工作.如果我只是foreach通过它并在每次迭代期间调用API,它的工作效果很好.但是,每次迭代需要一两秒钟,所以我想运行2-3个线程并在它们之间划分工作.这是我正在做的事情:
ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
.Select(service => Task.Run(async delegate
{
var availabilityResponse = await client.QueryAvailability(service);
// Do some other stuff, not really important
}));
await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)
该client.QueryAvailability调用基本上使用HttpClient该类调用API :
public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
}
throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}
Run Code Online (Sandbox Code Playgroud)
这种方法很有效,但最终事情开始超时.如果我将HttpClient超时设置为一小时,那么我开始得到奇怪的内部服务器错误.
我开始做的是在QueryAvailability方法中设置秒表以查看发生了什么.
发生的事情是RunData.Demand中的所有1200个项目一次创建,并且await client.PostAsJsonAsync正在调用所有1200个方法.它似乎然后使用2个线程慢慢检查任务,所以最后我有等待9或10分钟的任务.
这是我想要的行为: …
我有一个运行扫描各种服务器的服务.有问题的网络可能很庞大(数十万个网络节点).
该软件的当前版本使用由我们设计的排队/线程架构,其工作但效率不高(尤其是因为作业可能会产生不能很好处理的子节点)
V2即将到来,我正在考虑使用TPL.它似乎应该是理想的选择.
我已经看到了这个问题,答案意味着TPL可以处理的任务没有限制.在我的简单测试中(旋转100,000个任务并将它们交给TPL),TPL很早就开始出现Out-Of-Memory异常(足够公平 - 特别是在我的开发盒上).
扫描需要不同的时间长度,但5分钟/任务是一个很好的平均值.
可以想象,对于庞大网络的扫描可能需要相当长的时间,即使在功能强大的服务器上也是如此.
我已经有了一个框架,允许扫描作业(存储在Db中)在多个扫描服务器之间分配,但问题是我应该如何将工作传递给特定服务器上的TPL.
我可以监控TPL队列的大小吗(如果它低于几百个条目,可以加满)吗?这样做有不利之处吗?
我还需要处理需要暂停扫描的情况.通过不向TPL提供工作比通过取消/重置可能已经部分处理的任务更容易做到这一点.
所有初始任务都可以按任何顺序运行.必须在父项开始执行后运行子项,但由于父项产生它们,这应该不是问题.孩子们可以按任何顺序跑步.因此,我目前正在设想将子任务写回Db而不是直接生成TPL.如果需要,这将允许其他服务器"偷窃".
有没有人以这种方式使用TPL的经验?我需要注意哪些方面的考虑因素?
parallel-processing multithreading .net-4.0 task-parallel-library
我正在尝试尽可能快地向URL发出尽可能多的HTTP请求.
我正在使用这段代码来限制最大的并行度,所以我不会通过Tasks一次性产生大量的内容来溢出内存.
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Run Code Online (Sandbox Code Playgroud)
这似乎工作正常.
body() 基本归结为:
async Task Body()
{
var r = WebRequest.Create("// the url");
await r.GetResponseAsync();
}
Run Code Online (Sandbox Code Playgroud)
但是,我似乎在某个地方遇到了瓶颈.如果我尝试进行2500迭代,使用不同的值,dop我会得到以下结果:
DOP: 50
Total Time: 00:00:14.4801781
Average (ms): 246.6088
StDev: 84.1327983759009
DOP: 75
Total Time: 00:00:09.8089530
Average (ms): 265.758
StDev: 110.22912244956
DOP: 100
Total …Run Code Online (Sandbox Code Playgroud)