我针对这个问题提出了一个问题,但没有得到足够彻底的答案来解决该问题(很可能是由于在解释我的问题时缺乏严谨性,这正是我试图纠正的):Zombie process in python multiprocessing daemon
我正在尝试实现一个 python 守护进程,它使用工作池来使用Popen
. 我从http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/借用了基本守护进程
我只改变了init
, daemonize
(或同样的start
) 和stop
方法。以下是该方法的更改init
:
def __init__(self, pidfile):
#, stdin='/dev/null', stdout='STDOUT', stderr='STDOUT'):
#self.stdin = stdin
#self.stdout = stdout
#self.stderr = stderr
self.pidfile = pidfile
self.pool = Pool(processes=4)
Run Code Online (Sandbox Code Playgroud)
我没有设置 stdin、stdout 和 stderr,以便我可以使用 print 语句调试代码。另外,我尝试将此池移动到几个地方,但这是唯一不会产生异常的地方。
以下是该方法的更改daemonize
:
def daemonize(self):
...
# redirect standard file descriptors
#sys.stdout.flush()
#sys.stderr.flush()
#si = open(self.stdin, 'r')
#so = open(self.stdout, 'a+')
#se = open(self.stderr, 'a+', 0) …
Run Code Online (Sandbox Code Playgroud) 我希望在规则网格上生成周期性柏林噪声。我需要生成多个地图,并且网格非常大,因此我想使用多重处理,为每个核心生成一个地图。
这些地图将绘制在一个图形上,并一个接一个地放在一个二进制 dat 文件中。这些地图将存储在单个 numpy 数组中,其大小为地图数量*节点数量,因此切片将是一个地图,因此我可以同时访问数组的不同区域而无需担心。
我以这个线程作为参考,它使用一个池,而这个线程,我使用一个队列在多处理中做一些绘图。
我想出了两个代码:带有队列的代码在我自己的计算机上运行良好,但在我实验室的工作站或我的专业笔记本电脑上运行不佳:我没有错误消息,它只是在某个时刻冻结。第二个示例工作得很好,而且我发现它比第一个示例更简单,因为我只是直接写入 numpy 数组。(我不太明白第一个链接的异步情况下所有函数和 init 的需要。)
我的问题是:为什么我的第一个代码有问题?我只将我认为相关的代码放在下面。
感谢您的帮助。
第一次尝试:
def generate_irradiation_maps(rad_v):
while tasks_queue.empty() == False:
print("fetching work ...")
map_index = tasks_queue.get() # get some work to do from the queue
print("----> working on map: %s" % map_index)
perm = range(permsize)
random.shuffle(perm)
perm += perm
for i in range(nb_nodes):
# call the perlin function: fBm
rad_v[map_index, i] = fBm(perm, x[i] * freq, y[i] * freq, int(sizex * freq), int(sizey * freq), octs, …
Run Code Online (Sandbox Code Playgroud) 我有一个 python 函数funz
,每次都会返回长度为 p 的不同数组。我需要多次运行该函数,然后计算每个值的平均值。
我可以使用 for 循环来完成此操作,但需要很多次。
我正在尝试使用库多处理,但遇到错误。
import sklearn as sk
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn import preprocessing,linear_model, cross_validation
from scipy import stats
from multiprocessing import Pool
class stabilize(BaseEstimator,TransformerMixin):
def __init__(self,sim=3,n_folds=3):
self.sim=sim
self.n_folds=n_folds
def fit(self,X,y):
self.n,self.p=X.shape
self.X=X
self.y=y
self.beta=np.zeros(shape=(self.sim,self.p))
self.alpha_min=[]
self.mapper=p.map(self.multiple_cv,[1]*self.sim)
def multiple_cv(self,o):
kf=sk.cross_validation.KFold(self.n,n_folds=self.n_folds,shuffle=True)
cv=sk.linear_model.LassoCV(cv=kf).fit(self.X,self.y)
beta=cv.coef_
alpha_min=cv.alpha_
return alpha_min
Run Code Online (Sandbox Code Playgroud)
我使用虚拟变量 o 来告诉我要使用多少个并行进程。这不是很优雅,也许是错误的一部分。变量 X 和 y 已经是类的一部分,因此我没有参数传递给函数 multiple_cv。
当我运行该程序时,我收到此错误
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line …
Run Code Online (Sandbox Code Playgroud) 我有一个可以运行multiprocessing
的脚本pool.map
。问题是并非所有进程都需要很长时间才能完成,因此某些进程会休眠,因为它们会等到所有进程完成(与此问题相同的问题)。有些文件在不到一秒的时间内完成,其他文件则需要几分钟(或几小时)。
如果我正确理解手册(和这篇文章)pool.imap
,则不会等待所有进程完成,如果完成一个进程,它会提供一个新文件来处理。当我尝试这样做时,脚本正在加速要处理的文件,小文件按预期处理,大文件(需要更多时间处理)直到最后才完成(在没有通知的情况下被杀死?)。这是正常行为pool.imap
,还是我需要添加更多命令/参数?time.sleep(100)
当我将部分添加else
为测试时,它正在处理更大的文件,但其他进程会进入睡眠状态。有什么建议 ?谢谢
def process_file(infile):
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile
def main():
#nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = []
for infile in os.listdir():
todolist.append(infile)
try:
p = Pool(processes=nprocesses)
p.imap(process_file, todolist)
except KeyboardInterrupt:
print("Shutting processes down")
# …
Run Code Online (Sandbox Code Playgroud) 我正在使用 Jedis 在 Redis 中执行大量插入/读取操作。Redis 服务器使用默认配置。当我开始使用几个线程时出现问题,例外情况是:
redis.clients.jedis.exceptions.JedisConnectionException:java.net.SocketException:管道quebrado(写入失败)
我已经搜索了很多关于这个问题的信息,但找不到原因或解决方案。我用来执行这些测试的代码如下:
public class RedisFacade {
private static RedisFacade instancia = null;
// Initialize the Connection
final JedisPoolConfig poolConfig = buildPoolConfig();
JedisPool pool = new JedisPool(poolConfig, "localhost");
Jedis jedis;
int i = 0;
private RedisFacade() {
}
public static RedisFacade getInstancia() {
if (instancia == null) {
instancia = new RedisFacade();
}
return instancia;
}
// retorna um cliente jedis da pool
public Jedis getDB() {
if (jedis == null) {
jedis = pool.getResource(); …
Run Code Online (Sandbox Code Playgroud) 我正在使用 postgresql 数据库创建 nodejs 后端应用程序。我想要的是,一旦在 db.js 文件中创建了与数据库的连接,我就可以在其他文件中重用它来执行查询。
这是我的 db.js 文件
const pool = new Pool({
user: 'us',
host: 'localhost',
database: 'db',
password: 'pass',
port: 5432,
})
pool.on('connect', () => {
console.log('connected to the Database');
});
module.exports = () => { return pool; }
Run Code Online (Sandbox Code Playgroud)
这就是我尝试在 index.js 文件中使用它的方法
const db = require('./db.js')
app.get('/', (request, response) => {
db().query('SELECT * FROM country'), (error, results) => {
if (error) {
response.send(error)
}
console.log(results)
response.status(201).send(results)
}
})
Run Code Online (Sandbox Code Playgroud)
没有任何错误,当我转到这个特定页面时,它会继续加载。控制台中也没有任何内容。
但是,如果我在 db.js 文件中编写一个函数并执行类似的操作pool.query(...)
,将其导出,然后在我的 index.js 中编写 …
我正在尝试使用多重处理,其想法是从 Bing 搜索结果中获取链接,但使用 selenium 更改其中一项配置(cep 配置)。我将所有 cep 都放在列表 (filecep) 中,并且我想将所有结果写入 csv 文件。\n这是我的 getUrlCleans 函数:
\ndef getUrlCleans(search):\n\n\ndriver = webdriver.Firefox()\n\nf = open('out/'+str(date.today())+'.csv','w')\nf.write('url,cep')\nf.write('\\n')\n\nurl_cleans=[] \n\npool=mp.Pool(mp.cpu_count())\npool.starmap(getUrlbyCEP,[(cep,driver,search,f) for cep in filecep])\npool.close()\nf.close()\n
Run Code Online (Sandbox Code Playgroud)\n这是我的 getUrlbyCEP 函数:
\ndef getUrlbyCEP(cep,driver,search,f):\n\ndriver.get('https://www.bing.com/account/general?ru=https%3a%2f%2fwww.bing.com%2f%3fFORM%3dZ9FD1&FORM=O2HV65#location')\n \n \ncepInput = driver.find_element_by_id('geoname')\ncepInput.clear()\ncepInput.send_keys(cep)\ntime.sleep(0.5)\ndriver.execute_script("window.scrollTo(0,document.body.scrollHeight)")\n\n\nsaveButon=driver.find_element_by_id('sv_btn')\nsaveButon.click()\n\n\n\n\ntry:\n driver.find_element_by_id('geoname') \n # continue\nexcept:\n pass\n\nsearchInput=driver.find_element_by_id('sb_form_q')\nsearchInput.send_keys(search)\n\ndriver.find_element_by_id('sb_form_q').send_keys(Keys.ENTER)\ntime.sleep(0.5)\n\nurl_cleans=[]\n\nfor i in range(2):\n \n url_cleans=getLinks(driver,url_cleans)\n time.sleep(2)\n driver.find_element_by_xpath('//*[@title="Pr\xc3\xb3xima p\xc3\xa1gina"]').click()\n url_cleans=getLinks(driver,url_cleans)\n for u in url_cleans:\n f.write(u+','+cep)\n f.write('\\n')\n\n \n
Run Code Online (Sandbox Code Playgroud)\n最后我打电话
\ngetUrlCleans('sulamerica')\n
Run Code Online (Sandbox Code Playgroud)\nang 它给了我错误......我不知道为什么?
\n我正在multiprocessing.Pool
对象中使用该类并尝试执行以下操作:
from multiprocessing import Lock, Pool
class A:
def __init__(self):
self.lock = Lock()
self.file = open('test.txt')
def function(self, i):
self.lock.acquire()
line = self.file.readline()
self.lock.release()
return line
def anotherfunction(self):
pool = Pool()
results = pool.map(self.function, range(10000))
pool.close()
pool.join()
return results
Run Code Online (Sandbox Code Playgroud)
但是,我收到一个运行时错误,指出锁对象只能通过继承在进程之间共享。我对 Python 和多处理相当陌生。我怎样才能走上正确的轨道?
我们有许多 Python Databricks 作业,它们都使用相同的底层 Wheel 包来安装其依赖项。即使节点已在池中闲置,安装此 Wheel 软件包仍需要 90 秒。
其中一些作业运行时间非常长,因此我们希望使用作业计算机集群来降低 DBU 的成本。
其中一些作业的运行时间要短得多(<10 秒),其中 90 秒的安装时间似乎更重要。我们一直在考虑使用热集群(通用计算)来完成这些较短的作业。如果可能的话,我们希望避免通用计算的额外成本。
阅读 Databricks 文档表明池中的空闲实例是为我们保留的,但不会消耗我们的 DBU。有没有办法让我们在空闲实例上预安装所需的库,以便当作业完成时我们能够立即开始处理它?
是否有替代方法可以满足类似的用例?
使用池连接 - aiomysql
我的代码如下所示:
# POOL CONNECTION
# create pool connection
async def create_pool():
print("Creating pool connection...")
global pool
pool = await aiomysql.create_pool(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
db=DB_DBNAME,
autocommit=True
)
async def get_connection():
async with pool.acquire() as conn:
return conn
pool.close()
await pool.wait_closed()
connection = await get_connection()
async with connection.cursor() as cursor:
await cursor.execute(...)
Run Code Online (Sandbox Code Playgroud)
如果发出单个请求,连接到 mysql,它会正确运行,但如果同时发出 2 个或更多请求,则会崩溃并抛出此错误:
当另一个协程已经在等待传入数据时调用 readexactly()
pool ×10
python ×7
aio-mysql ×1
azure ×1
cpu-usage ×1
databricks ×1
express ×1
java ×1
javascript ×1
jedis ×1
mysql ×1
node.js ×1
numpy ×1
postgresql ×1
python-3.5 ×1
redis ×1
selenium ×1
threadpool ×1