我听说任务并行库可以在.Net 3.5项目中使用.这是否正确,如果是,我该如何使用它?在.Net 4.0中,它驻留在System.Threading中,但是当我在Visual Studio 2010中选择.Net 3.5作为目标时,我无法访问Parallel和Parallel循环等类.
我有一个200个命令的列表/队列,我需要在Linux服务器上的shell中运行.
我只希望一次最多运行10个进程(从队列中).某些流程需要几秒钟才能完成,其他流程需要更长时间.
当进程完成时,我希望从队列中"弹出"下一个命令并执行.
有没有人有代码来解决这个问题?
进一步阐述:
在某种队列中需要完成200件工作.我希望一次最多可以完成10件工作.当一个线程完成一项工作时,它应该要求队列进行下一项工作.如果队列中没有其他工作,则线程应该死掉.当所有线程都已经死亡时,意味着所有工作都已完成.
我正在尝试解决的实际问题是使用imapsync将旧邮件服务器中的200个邮箱同步到新邮件服务器.某些用户拥有大型邮箱并需要很长时间才能同步,其他用户拥有非常小的邮箱并可以快速同步.
是否有可用于在Windows批处理文件中并行执行多个进程的工具?我找到了一些有趣的Linux工具(并行和PPSS),但是,我需要一个适用于Windows平台的工具.
额外奖励:如果该工具还允许在多台机器之间轻松分配流程,并通过PsExec远程运行流程,那将会很棒.
示例:我希望在以下for循环中使用它
for %F in (*.*) do processFile.exe %F
Run Code Online (Sandbox Code Playgroud)
有限数量的processFile.exe实例并行运行以利用多核CPU.
寻找有关利用AsParallel()或Parallel.ForEach()加快这一点的一些建议.
请参阅下面的方法(本例中简化/标准化).
它需要一个像"US,FR,APAC"这样的列表,其中"APAC"是50个其他"US,FR,JP,IT,GB"等的别名.该方法应采用"US,FR,APAC",并将其转换为"US","FR"列表以及"APAC"中的所有国家/地区.
private IEnumerable<string> Countries (string[] countriesAndAliases)
{
var countries = new List<string>();
foreach (var countryOrAlias in countriesAndAliases)
{
if (IsCountryNotAlias(countryOrAlias))
{
countries.Add(countryOrAlias);
}
else
{
foreach (var aliasCountry in AliasCountryLists[countryOrAlias])
{
countries.Add(aliasCountry);
}
}
}
return countries.Distinct();
}
Run Code Online (Sandbox Code Playgroud)
是否将其并行化为将其更改为以下内容?使用AsParallel()比这更细微吗?我应该使用Parallel.ForEach()而不是foreach?并行化foreach循环时我应该使用哪些经验法则?
private IEnumerable<string> Countries (string[] countriesAndAliases)
{
var countries = new List<string>();
foreach (var countryOrAlias in countriesAndAliases.AsParallel())
{
if (IsCountryNotAlias(countryOrAlias))
{
countries.Add(countryOrAlias);
}
else
{
foreach (var aliasCountry …Run Code Online (Sandbox Code Playgroud) 列表推导和地图计算都应该 - 至少在理论上 - 相对容易并行化:列表理解中的每个计算都可以独立于所有其他元素的计算来完成.例如在表达式中
[ x*x for x in range(1000) ]
Run Code Online (Sandbox Code Playgroud)
每个x*x-计算可以(至少在理论上)并行完成.
我的问题是:是否有任何Python-Module/Python-Implementation/Python Programming-Trick来并行化列表理解计算(为了使用所有16/32/...核心或通过计算机网格分配计算或在云上)?
对于每个正常的foreach使用parallel.foreach循环是否有意义?
我什么时候应该开始使用parallel.foreach,只迭代1,000,000个项目?
据我所知ForkJoinPool,该池创建了固定数量的线程(默认值:核心数),并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程managedBlock).
但是,使用ForkJoinPool.getPoolSize()我发现在创建30,000个任务(RecursiveAction)的程序中,ForkJoinPool执行这些任务平均使用700个线程(每次创建任务时计算的线程数).任务不做I/O,而是纯粹的计算; 唯一的任务间同步是调用ForkJoinTask.join()和访问AtomicBooleans,即没有线程阻塞操作.
因为join()不会像我理解的那样阻塞调用线程,所以没有理由为什么池中的任何线程都应该阻塞,所以(我曾经假设)应该没有理由创建任何进一步的线程(这显然发生了) .
那么,为什么要ForkJoinPool创建这么多线程呢?哪些因素决定了创建的线程数?
我曾希望这个问题可以在不发布代码的情况下得到解答,但在此请求.此代码摘自四倍大小的程序,简化为必要部分; 它不会按原样编译.如果需要,我当然也可以发布完整的程序.
程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径.保证存在解决方案.主要逻辑在以下compute()方法中SolverTask:A RecursiveAction从某个给定点开始,并继续从当前点可到达的所有邻居点.它不是SolverTask在每个分支点创建一个新的(这将创建太多的任务),而是将除了一个之外的所有邻居推送到后退堆栈以便稍后处理,并继续只有一个邻居没有被推送到堆栈.一旦它以这种方式达到死胡同,就会弹出最近推到回溯堆栈的点,并从那里继续搜索(相应地减少从taks起点构建的路径).一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务; 从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何其他点推到其堆栈,而是为每个这样的点创建一个新任务.因此,可以使用堆栈限制阈值来调整任务的大小.
我上面引用的数字("30,000个任务,平均700个线程")来自于搜索5000x5000个单元格的迷宫.所以,这是基本代码:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000; …Run Code Online (Sandbox Code Playgroud) 如果我使用parSapplythen 调用函数print,message或者cat该函数内的语句似乎没有输出到控制台.
我的过程需要很长时间,因此我需要一些方法来查看进度并在结束时获得结果输出.是否有任何特殊命令可以让我从并行进程打印到控制台?
例:
library(parallel)
oneloop = function(x) {
for(i in 1:50) {
a = rnorm(100000)
a = sort(a)
}
print(x)
message(x)
cat(x)
}
cl <- makeCluster(5)
output = parSapply(cl, 1:10, oneloop)
stopCluster(cl)
Run Code Online (Sandbox Code Playgroud) 我有一个大型数据帧(几百万行).
我希望能够对它进行groupby操作,但只需按任意连续(最好是相等大小)的行子集进行分组,而不是使用各行的任何特定属性来决定它们去哪个组.
用例:我想通过IPython中的并行映射将函数应用于每一行.哪个行转到哪个后端引擎并不重要,因为该函数一次基于一行计算结果.(从概念上讲,至少;实际上它是矢量化的.)
我想出了这样的事情:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)
# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]
# Process chunks in parallel
results = dview.map_sync(my_function, groups)
Run Code Online (Sandbox Code Playgroud)
但这似乎很啰嗦,并不能保证大小相等.特别是如果索引是稀疏的或非整数的或其他什么.
有什么更好的方法吗?
谢谢!