标签: pool

守护进程内的Python多处理池

我针对这个问题提出了一个问题,但没有得到足够彻底的答案来解决该问题(很可能是由于在解释我的问题时缺乏严谨性,这正是我试图纠正的):Zombie process in python multiprocessing daemon

我正在尝试实现一个 python 守护进程,它使用工作池来使用Popen. 我从http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/借用了基本守护进程

我只改变了init, daemonize(或同样的start) 和stop方法。以下是该方法的更改init

def __init__(self, pidfile):
#, stdin='/dev/null', stdout='STDOUT', stderr='STDOUT'):
    #self.stdin = stdin
    #self.stdout = stdout
    #self.stderr = stderr
    self.pidfile = pidfile
    self.pool = Pool(processes=4)
Run Code Online (Sandbox Code Playgroud)

我没有设置 stdin、stdout 和 stderr,以便我可以使用 print 语句调试代码。另外,我尝试将此池移动到几个地方,但这是唯一不会产生异常的地方。

以下是该方法的更改daemonize

def daemonize(self):
    ...

    # redirect standard file descriptors
    #sys.stdout.flush()
    #sys.stderr.flush()
    #si = open(self.stdin, 'r')
    #so = open(self.stdout, 'a+')
    #se = open(self.stderr, 'a+', 0) …
Run Code Online (Sandbox Code Playgroud)

python multithreading pool multiprocessing threadpool

5
推荐指数
1
解决办法
4889
查看次数

使用异步共享 numpy 数组的 python 多处理:池与队列

我希望在规则网格上生成周期性柏林噪声。我需要生成多个地图,并且网格非常大,因此我想使用多重处理,为每个核心生成一个地图。

这些地图将绘制在一个图形上,并一个接一个地放在一个二进制 dat 文件中。这些地图将存储在单个 numpy 数组中,其大小为地图数量*节点数量,因此切片将是一个地图,因此我可以同时访问数组的不同区域而无需担心。

我以这个线程作为参考,它使用一个池,而这个线程,我使用一个队列在多处理中做一些绘图。

我想出了两个代码:带有队列的代码在我自己的计算机上运行良好,但在我实验室的工作站或我的专业笔记本电脑上运行不佳:我没有错误消息,它只是在某个时刻冻结。第二个示例工作得很好,而且我发现它比第一个示例更简单,因为我只是直接写入 numpy 数组。(我不太明白第一个链接的异步情况下所有函数和 init 的需要。)

我的问题是:为什么我的第一个代码有问题?我只将我认为相关的代码放在下面。

感谢您的帮助。

第一次尝试:

def generate_irradiation_maps(rad_v):
    while tasks_queue.empty() == False:
        print("fetching work ...")
        map_index = tasks_queue.get()  # get some work to do from the queue
        print("----> working on map: %s" % map_index)
        perm = range(permsize)
        random.shuffle(perm)
        perm += perm
        for i in range(nb_nodes):
            # call the perlin function: fBm
            rad_v[map_index, i] = fBm(perm, x[i] * freq, y[i] * freq, int(sizex *     freq), int(sizey * freq), octs, …
Run Code Online (Sandbox Code Playgroud)

python numpy pool multiprocessing

5
推荐指数
1
解决办法
2779
查看次数

如何在类中并行化 python 中的 for ?

我有一个 python 函数funz,每次都会返回长度为 p 的不同数组。我需要多次运行该函数,然后计算每个值的平均值。

我可以使用 for 循环来完成此操作,但需要很多次。

我正在尝试使用库多处理,但遇到错误。

import sklearn as sk
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn import preprocessing,linear_model, cross_validation
from scipy import stats
from multiprocessing import Pool


class stabilize(BaseEstimator,TransformerMixin):

    def __init__(self,sim=3,n_folds=3):
        self.sim=sim
        self.n_folds=n_folds

    def fit(self,X,y):
        self.n,self.p=X.shape
        self.X=X
        self.y=y        
        self.beta=np.zeros(shape=(self.sim,self.p))
        self.alpha_min=[]        
        self.mapper=p.map(self.multiple_cv,[1]*self.sim)    

    def multiple_cv(self,o):
        kf=sk.cross_validation.KFold(self.n,n_folds=self.n_folds,shuffle=True)
        cv=sk.linear_model.LassoCV(cv=kf).fit(self.X,self.y)
        beta=cv.coef_
        alpha_min=cv.alpha_
        return alpha_min
Run Code Online (Sandbox Code Playgroud)

我使用虚拟变量 o 来告诉我要使用多少个并行进程。这不是很优雅,也许是错误的一部分。变量 X 和 y 已经是类的一部分,因此我没有参数传递给函数 multiple_cv。

当我运行该程序时,我收到此错误

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing pool multiprocessing

5
推荐指数
1
解决办法
2589
查看次数

Python 多处理池映射和 imap

我有一个可以运行multiprocessing的脚本pool.map。问题是并非所有进程都需要很长时间才能完成,因此某些进程会休眠,因为它们会等到所有进程完成(与此问题相同的问题)。有些文件在不到一秒的时间内完成,其他文件则需要几分钟(或几小时)。

如果我正确理解手册(和这篇文章)pool.imap ,则不会等待所有进程完成,如果完成一个进程,它会提供一个新文件来处理。当我尝试这样做时,脚本正在加速要处理的文件,小文件按预期处理,大文件(需要更多时间处理)直到最后才完成(在没有通知的情况下被杀死?)。这是正常行为pool.imap,还是我需要添加更多命令/参数?time.sleep(100)当我将部分添加else为测试时,它正在处理更大的文件,但其他进程会进入睡眠状态。有什么建议 ?谢谢

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # …
Run Code Online (Sandbox Code Playgroud)

pool cpu-usage multiprocessing python-3.5

5
推荐指数
1
解决办法
6509
查看次数

使用 Jedis Pool 时管道损坏

我正在使用 Jedis 在 Redis 中执行大量插入/读取操作。Redis 服务器使用默认配置。当我开始使用几个线程时出现问题,例外情况是:

redis.clients.jedis.exceptions.JedisConnectionException:java.net.SocketException:管道quebrado(写入失败)

我已经搜索了很多关于这个问题的信息,但找不到原因或解决方案。我用来执行这些测试的代码如下:

public class RedisFacade {

private static RedisFacade instancia = null;
// Initialize the Connection
final JedisPoolConfig poolConfig = buildPoolConfig();
JedisPool pool = new JedisPool(poolConfig, "localhost");
Jedis jedis;
int i = 0;

private RedisFacade() {
}

public static RedisFacade getInstancia() {
    if (instancia == null) {
        instancia = new RedisFacade();
    }
    return instancia;
}

// retorna um cliente jedis da pool
public Jedis getDB() {
    if (jedis == null) {
        jedis = pool.getResource(); …
Run Code Online (Sandbox Code Playgroud)

java pool redis jedis

5
推荐指数
1
解决办法
1万
查看次数

在其他节点 javascript 文件中重用 postgresql 池

我正在使用 postgresql 数据库创建 nodejs 后端应用程序。我想要的是,一旦在 db.js 文件中创建了与数据库的连接,我就可以在其他文件中重用它来执行查询。

这是我的 db.js 文件

const pool = new Pool({
    user: 'us',
    host: 'localhost',
    database: 'db',
    password: 'pass',
    port: 5432,
})
pool.on('connect', () => {
    console.log('connected to the Database');
});
module.exports = () => { return pool; }
Run Code Online (Sandbox Code Playgroud)

这就是我尝试在 index.js 文件中使用它的方法

const db = require('./db.js')

app.get('/', (request, response) => {

    db().query('SELECT * FROM country'), (error, results) => {
        if (error) {
            response.send(error)
        }
        console.log(results)
        response.status(201).send(results)
    }
})
Run Code Online (Sandbox Code Playgroud)

没有任何错误,当我转到这个特定页面时,它会继续加载。控制台中也没有任何内容。

但是,如果我在 db.js 文件中编写一个函数并执行类似的操作pool.query(...),将其导出,然后在我的 index.js 中编写 …

javascript postgresql pool node.js express

5
推荐指数
1
解决办法
2524
查看次数

为什么给我这个错误:TypeError:无法pickle '_io.TextIOWrapper'对象?

我正在尝试使用多重处理,其想法是从 Bing 搜索结果中获取链接,但使用 selenium 更改其中一项配置(cep 配置)。我将所有 cep 都放在列表 (filecep) 中,并且我想将所有结果写入 csv 文件。\n这是我的 getUrlCleans 函数:

\n
def getUrlCleans(search):\n\n\ndriver = webdriver.Firefox()\n\nf = open('out/'+str(date.today())+'.csv','w')\nf.write('url,cep')\nf.write('\\n')\n\nurl_cleans=[] \n\npool=mp.Pool(mp.cpu_count())\npool.starmap(getUrlbyCEP,[(cep,driver,search,f) for cep in filecep])\npool.close()\nf.close()\n
Run Code Online (Sandbox Code Playgroud)\n

这是我的 getUrlbyCEP 函数:

\n
def getUrlbyCEP(cep,driver,search,f):\n\ndriver.get('https://www.bing.com/account/general?ru=https%3a%2f%2fwww.bing.com%2f%3fFORM%3dZ9FD1&FORM=O2HV65#location')\n                \n    \ncepInput = driver.find_element_by_id('geoname')\ncepInput.clear()\ncepInput.send_keys(cep)\ntime.sleep(0.5)\ndriver.execute_script("window.scrollTo(0,document.body.scrollHeight)")\n\n\nsaveButon=driver.find_element_by_id('sv_btn')\nsaveButon.click()\n\n\n\n\ntry:\n    driver.find_element_by_id('geoname')        \n    # continue\nexcept:\n    pass\n\nsearchInput=driver.find_element_by_id('sb_form_q')\nsearchInput.send_keys(search)\n\ndriver.find_element_by_id('sb_form_q').send_keys(Keys.ENTER)\ntime.sleep(0.5)\n\nurl_cleans=[]\n\nfor i in range(2):\n    \n    url_cleans=getLinks(driver,url_cleans)\n    time.sleep(2)\n    driver.find_element_by_xpath('//*[@title="Pr\xc3\xb3xima p\xc3\xa1gina"]').click()\n    url_cleans=getLinks(driver,url_cleans)\n    for u in url_cleans:\n        f.write(u+','+cep)\n        f.write('\\n')\n\n    \n
Run Code Online (Sandbox Code Playgroud)\n

最后我打电话

\n
getUrlCleans('sulamerica')\n
Run Code Online (Sandbox Code Playgroud)\n

ang 它给了我错误......我不知道为什么?

\n

python selenium pool multiprocessing

5
推荐指数
1
解决办法
2万
查看次数

锁对象只能通过继承在进程之间共享

我正在multiprocessing.Pool对象中使用该类并尝试执行以下操作:

from multiprocessing import Lock, Pool

class A:
    def __init__(self):
        self.lock = Lock()
        self.file = open('test.txt')
    def function(self, i):
        self.lock.acquire()
        line = self.file.readline()
        self.lock.release()
        return line
    def anotherfunction(self):
        pool = Pool()
        results = pool.map(self.function, range(10000))
        pool.close()
        pool.join()
        return results
Run Code Online (Sandbox Code Playgroud)

但是,我收到一个运行时错误,指出锁对象只能通过继承在进程之间共享。我对 Python 和多处理相当陌生。我怎样才能走上正确的轨道?

python pool multiprocessing

5
推荐指数
1
解决办法
1万
查看次数

您可以在 Databricks 池节点上预安装库吗?

我们有许多 Python Databricks 作业,它们都使用相同的底层 Wheel 包来安装其依赖项。即使节点已在池中闲置,安装此 Wheel 软件包仍需要 90 秒。

其中一些作业运行时间非常长,因此我们希望使用作业计算机集群来降低 DBU 的成本。

其中一些作业的运行时间要短得多(<10 秒),其中 90 秒的安装时间似乎更重要。我们一直在考虑使用热集群(通用计算)来完成这些较短的作业。如果可能的话,我们希望避免通用计算的额外成本。

阅读 Databricks 文档表明池中的空闲实例是为我们保留的,但不会消耗我们的 DBU。有没有办法让我们在空闲实例上预安装所需的库,以便当作业完成时我们能够立即开始处理它?

是否有替代方法可以满足类似的用例?

python pool cluster-computing azure databricks

5
推荐指数
1
解决办法
2496
查看次数

当另一个协程已经在等待传入数据时调用 readexactly()

使用池连接 - aiomysql

我的代码如下所示:

# POOL CONNECTION

# create pool connection
async def create_pool():
    print("Creating pool connection...")
    global pool

    pool = await aiomysql.create_pool(
        host=DB_HOST,
        port=DB_PORT,
        user=DB_USER,
        password=DB_PASSWORD,
        db=DB_DBNAME,
        autocommit=True
    )


async def get_connection():
    async with pool.acquire() as conn:
        return conn
    pool.close()
    await pool.wait_closed()


connection = await get_connection()
async with connection.cursor() as cursor:
            await cursor.execute(...)
Run Code Online (Sandbox Code Playgroud)

如果发出单个请求,连接到 mysql,它会正确运行,但如果同时发出 2 个或更多请求,则会崩溃并抛出此错误:

当另一个协程已经在等待传入数据时调用 readexactly()

python mysql connection-pooling pool aio-mysql

5
推荐指数
0
解决办法
1193
查看次数