我想使用一个进程池运行一些作业并应用一个给定的超时,之后应该杀死一个作业,并替换另一个处理下一个任务的作业.
我试图使用multiprocessing模块提供一种方法来异步运行工作池(例如使用map_async),但在那里我只能设置一个"全局"超时,之后所有进程都将被终止.
是否有可能有一个单独的超时,之后只有一个耗时太长的进程被杀死,而一个新的worker再次被添加到池中(处理下一个任务并跳过超时的那个)?
这是一个简单的例子来说明我的问题:
def Check(n):
import time
if n % 2 == 0: # select some (arbitrary) subset of processes
print "%d timeout" % n
while 1:
# loop forever to simulate some process getting stuck
pass
print "%d done" % n
return 0
from multiprocessing import Pool
pool = Pool(processes=4)
result = pool.map_async(Check, range(10))
print result.get(timeout=1)
Run Code Online (Sandbox Code Playgroud)
超时后,所有工人都被杀死,程序退出.我想继续下一个子任务.我是否必须自己实施此行为或是否存在现有解决方案?
可以杀死悬挂的工人并自动更换.所以我提出了这个代码:
jobs = pool.map_async(Check, range(10))
while 1:
try:
print "Waiting …Run Code Online (Sandbox Code Playgroud) 我使用 Yarn 和 Mocha 运行 TypeScript 测试,它们在本地运行良好。然而,当我通过 CircleCI 部署时,我得到了这个:
1) Uncaught error outside test suite:
Uncaught Workerpool Worker terminated Unexpectedly
exitCode: `null`
signalCode: `SIGKILL`
workerpool.script: `/home/circleci/my-project/node_modules/mocha/lib/nodejs/worker.js`
spawnArgs: `/usr/local/bin/node,--inspect,--inspect=43215,/home/circleci/my-project/node_modules/mocha/lib/nodejs/worker.js`
spawnfile: `/usr/local/bin/node`
stdout: `null`
stderr: `null`
Error: Workerpool Worker terminated Unexpectedly
exitCode: `null`
signalCode: `SIGKILL`
spawnfile: `/usr/local/bin/node`
stdout: `null`
stderr: `null`
at ChildProcess.<anonymous> (node_modules/workerpool/src/WorkerHandler.js:294:13)
at Process.ChildProcess._handle.onexit (internal/child_process.js:282:12)
Run Code Online (Sandbox Code Playgroud)
这是我的 CircleCI 配置。我编辑了一些特定于我的项目的字段,并删除了一些在这里没有意义的部分,因为它们用于我当前无法运行的作业,因为它们稍后会在过程中运行。
version: 2.1
orbs:
aws-cli: circleci/aws-cli@2.0.6
assume-role: airswap/assume-role@0.2.0
docker_base: &docker_base
working_directory: ~/my-funnel # Edited for privacy
docker:
- image: cimg/node:14.18.0
- image: cimg/openjdk:17.0.1 …Run Code Online (Sandbox Code Playgroud) 我正在构建一个 Go 应用程序,它使用 goroutines 的“工作池”,最初我启动池创建许多工作人员。我想知道多核处理器中的最佳工人数量是多少,例如在具有 4 核的 CPU 中?我目前正在使用以下方法:
// init pool
numCPUs := runtime.NumCPU()
runtime.GOMAXPROCS(numCPUs + 1) // numCPUs hot threads + one for async tasks.
maxWorkers := numCPUs * 4
jobQueue := make(chan job.Job)
module := Module{
Dispatcher: job.NewWorkerPool(maxWorkers),
JobQueue: jobQueue,
Router: router,
}
// A buffered channel that we can send work requests on.
module.Dispatcher.Run(jobQueue)
Run Code Online (Sandbox Code Playgroud)
完整的实现在下
job.NewWorkerPool(maxWorkers) 和 module.Dispatcher.Run(jobQueue)
我使用工作池的用例:我有一个服务,它接受请求并调用多个外部 API,并将它们的结果聚合到一个响应中。每个调用都可以独立于其他调用,因为结果的顺序无关紧要。我将调用分派到工作池,其中每个调用都以异步方式在一个可用的 goroutine 中完成。一旦工作线程完成,我的“请求”线程就会在获取和聚合结果的同时继续监听返回通道。完成所有操作后,最终聚合结果将作为响应返回。由于每个外部 API 调用可能呈现可变响应时间,因此某些调用可以比其他调用更早完成。
worker-pool ×3
circleci ×1
go ×1
goroutine ×1
mocha.js ×1
optimization ×1
python ×1
timeout ×1
typescript ×1
yarnpkg ×1