标签: worker-pool

使用Python中的工作池进行异步多处理:如何在超时后继续运行?

我想使用一个进程池运行一些作业并应用一个给定的超时,之后应该杀死一个作业,并替换另一个处理下一个任务的作业.

我试图使用multiprocessing模块提供一种方法来异步运行工作池(例如使用map_async),但在那里我只能设置一个"全局"超时,之后所有进程都将被终止.

是否有可能有一个单独的超时,之后只有一个耗时太长的进程被杀死,而一个新的worker再次被添加到池中(处理下一个任务并跳过超时的那个)?

这是一个简单的例子来说明我的问题:

def Check(n):
  import time
  if n % 2 == 0: # select some (arbitrary) subset of processes
    print "%d timeout" % n
    while 1:
      # loop forever to simulate some process getting stuck
      pass
  print "%d done" % n
  return 0

from multiprocessing import Pool
pool = Pool(processes=4)
result = pool.map_async(Check, range(10))
print result.get(timeout=1)    
Run Code Online (Sandbox Code Playgroud)

超时后,所有工人都被杀死,程序退出.我想继续下一个子任务.我是否必须自己实施此行为或是否存在现有解决方案?

更新

可以杀死悬挂的工人并自动更换.所以我提出了这个代码:

jobs = pool.map_async(Check, range(10))
while 1:
  try:
    print "Waiting …
Run Code Online (Sandbox Code Playgroud)

python timeout multiprocessing worker-pool

6
推荐指数
2
解决办法
8833
查看次数

CircleCI 中的 Mocha 测试显示“Workerpool Worker 意外终止”

我使用 Yarn 和 Mocha 运行 TypeScript 测试,它们在本地运行良好。然而,当我通过 CircleCI 部署时,我得到了这个:

1) Uncaught error outside test suite:
   Uncaught Workerpool Worker terminated Unexpectedly
  exitCode: `null`
  signalCode: `SIGKILL`
  workerpool.script: `/home/circleci/my-project/node_modules/mocha/lib/nodejs/worker.js`
  spawnArgs: `/usr/local/bin/node,--inspect,--inspect=43215,/home/circleci/my-project/node_modules/mocha/lib/nodejs/worker.js`
  spawnfile: `/usr/local/bin/node`
  stdout: `null`
  stderr: `null`

Error: Workerpool Worker terminated Unexpectedly
    exitCode: `null`
    signalCode: `SIGKILL`
    spawnfile: `/usr/local/bin/node`
    stdout: `null`
    stderr: `null`
  
    at ChildProcess.<anonymous> (node_modules/workerpool/src/WorkerHandler.js:294:13)
    at Process.ChildProcess._handle.onexit (internal/child_process.js:282:12)
Run Code Online (Sandbox Code Playgroud)

这是我的 CircleCI 配置。我编辑了一些特定于我的项目的字段,并删除了一些在这里没有意义的部分,因为它们用于我当前无法运行的作业,因为它们稍后会在过程中运行。

version: 2.1

orbs:
  aws-cli: circleci/aws-cli@2.0.6
  assume-role: airswap/assume-role@0.2.0

docker_base: &docker_base
  working_directory: ~/my-funnel  # Edited for privacy
  docker:
    - image: cimg/node:14.18.0
    - image: cimg/openjdk:17.0.1 …
Run Code Online (Sandbox Code Playgroud)

mocha.js typescript circleci worker-pool yarnpkg

5
推荐指数
1
解决办法
1564
查看次数

工作池的最佳大小

我正在构建一个 Go 应用程序,它使用 goroutines 的“工作池”,最初我启动池创建许多工作人员。我想知道多核处理器中的最佳工人数量是多少,例如在具有 4 核的 CPU 中?我目前正在使用以下方法:

    // init pool
    numCPUs := runtime.NumCPU()

    runtime.GOMAXPROCS(numCPUs + 1) // numCPUs hot threads + one for async tasks.
    maxWorkers := numCPUs * 4

    jobQueue := make(chan job.Job)

    module := Module{
        Dispatcher: job.NewWorkerPool(maxWorkers),
        JobQueue:   jobQueue,
        Router:     router,
    }

    // A buffered channel that we can send work requests on.
    module.Dispatcher.Run(jobQueue)
Run Code Online (Sandbox Code Playgroud)

完整的实现在下

job.NewWorkerPool(maxWorkers) 和 module.Dispatcher.Run(jobQueue)

我使用工作池的用例:我有一个服务,它接受请求并调用多个外部 API,并将它们的结果聚合到一个响应中。每个调用都可以独立于其他调用,因为结果的顺序无关紧要。我将调用分派到工作池,其中每个调用都以异步方式在一个可用的 goroutine 中完成。一旦工作线程完成,我的“请求”线程就会在获取和聚合结果的同时继续监听返回通道。完成所有操作后,最终聚合结果将作为响应返回。由于每个外部 API 调用可能呈现可变响应时间,因此某些调用可以比其他调用更早完成。

parallel-processing optimization go goroutine worker-pool

3
推荐指数
1
解决办法
3650
查看次数