我是 python 中多处理概念的新手,当我尝试在我的代码中包含多处理时,我在访问变量时遇到了问题。对不起,如果我听起来很天真,但我就是想不通。下面是我的场景的一个简单版本。
class Data:
def __init__(self):
self.data = "data"
def datameth(self):
print self.data
print mainvar
class First:
def __init__(self):
self.first = "first"
def firstmeth(self):
d = Data()
d.datameth()
print self.first
def mymethod():
f = First()
f.firstmeth()
if __name__ == '__main__':
mainvar = "mainvar"
mymethod()
Run Code Online (Sandbox Code Playgroud)
当我运行它时,它运行良好并给出输出:
data
mainvar
first
Run Code Online (Sandbox Code Playgroud)
但是当我尝试mymethod()作为一个进程运行时
from multiprocessing import Process
class Data:
def __init__(self):
self.data = "data"
def datameth(self):
print self.data
print mainvar
class First:
def __init__(self):
self.first = "first"
def firstmeth(self):
d = …Run Code Online (Sandbox Code Playgroud) python class multiprocessing python-2.7 python-multiprocessing
I have a function that writes the content of df into csv file.
def writeToCSV(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
headers = []
fid = open(defFile, 'r')
for line in fid:
headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
df = pd.DataFrame([], columns=headers)
for header in outDf.columns.values:
if header in headers:
df[header] = outDf[header]
df.to_csv(toFile, sep=delim, quotechar=quotechar, index=False, encoding='utf-8')
Run Code Online (Sandbox Code Playgroud)
How can i parallelize this process? Currently i am using following code
def writeToSchemaParallel(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
logInfo('Start writingtoSchema in parallel...', 'track')
headers = []
fid …Run Code Online (Sandbox Code Playgroud) 我有一个 18Gb 的 pickle 文件,我需要跨进程访问它。我尝试使用
from multiprocessing import Manager
import cPickle as pkl
manager = Manager()
data = manager.dict(pkl.load(open("xyz.pkl","rb")))
Run Code Online (Sandbox Code Playgroud)
但是,我遇到以下问题:
IOError: [Errno 11] Resource temporarily unavailable
Run Code Online (Sandbox Code Playgroud)
有人建议这可能是因为套接字超时,但它似乎不是因为增加超时没有帮助。我该怎么做。还有其他有效的跨进程共享数据的方法吗?
问题:检查超过 1000 个 url 的列表并获取 url 返回码 (status_code)。
我的脚本有效,但速度很慢。
我认为必须有一种更好的、pythonic(更漂亮)的方式来做到这一点,在那里我可以产生 10 或 20 个线程来检查 url 并收集响应。(IE:
200 -> www.yahoo.com
404 -> www.badurl.com
...
Run Code Online (Sandbox Code Playgroud)
www.example.com
www.yahoo.com
www.testsite.com
Run Code Online (Sandbox Code Playgroud)
....
import requests
with open("url10.txt") as f:
urls = f.read().splitlines()
print(urls)
for url in urls:
url = 'http://'+url #Add http:// to each url (there has to be a better way to do this)
try:
resp = requests.get(url, timeout=1)
print(len(resp.content), '->', resp.status_code, '->', resp.url)
except Exception as e:
print("Error", url)
Run Code Online (Sandbox Code Playgroud)
挑战: 通过多处理提高速度。
但它不工作。我收到以下错误:(注意:我不确定我是否正确实现了这一点) …
python multithreading multiprocessing python-multiprocessing
我有一个像下面这样的代码段
pool = multiprocessing.Pool(10)
for i in range(300):
for m in range(500):
data = do_some_calculation(resource)
pool.apply_async(paralized_func, data, call_back=update_resource)
# need to wait for all processes finish
# {...}
# Summarize resource
do_something_with_resource(resource)
Run Code Online (Sandbox Code Playgroud)
所以基本上我有2个循环。我在循环外初始化进程池以避免过热。在第二个循环结束时,我想总结所有过程的结果。
问题是我不能pool.map()因为data输入的变化而等待。我不能使用pool.join()andpool.close()或者因为我仍然需要pool在第一个循环的下一次迭代中使用。
在这种情况下等待进程完成的好方法是什么?
我尝试在第二个循环结束时检查 pool._cache。
while len(process_pool._cache) > 0:
sleep(0.001)
Run Code Online (Sandbox Code Playgroud)
这种方式有效,但看起来很奇怪。有一个更好的方法吗?
如何同时运行多个python文件?有 3 个文件:bot_1.py、bot_2.py、bot_3.py。我想同时运行它们。我附上了代码。我应该在工作函数中写什么来使这个脚本工作?我将不胜感激任何帮助。
import multiprocessing
import subprocess
def worker(file):
#your subprocess code
subprocess.Popen(['screen', './bot_1.py'])
subprocess.Popen(['screen', './bot_2.py'])
subprocess.Popen(['screen', './bot_3.py'])
if __name__ == '__main__':
files = ["bot_1.py","bot_2.py","bot_3.py"]
for i in files:
p = multiprocessing.Process(target=worker(i))
p.start()Run Code Online (Sandbox Code Playgroud)
我在下面有一个简单的代码片段来演示这个问题。
from multiprocessing import Pipe
import time
recv_end, send_end = Pipe(duplex=False)
d = {'word'+str(elem): elem for elem in range(3000)}
start_time = time.time()
send_end.send(d)
print('--- %s seconds ---' % (time.time()-start_time))
Run Code Online (Sandbox Code Playgroud)
以上工作正常,对我的目的来说足够快,没问题。但是如果我将大小设置为 5000,它就会无限期地挂起:
from multiprocessing import Pipe
import time
recv_end, send_end = Pipe(duplex=False)
d = {'word'+str(elem): elem for elem in range(5000)} # changed to 5000
start_time = time.time()
send_end.send(d)
print('--- %s seconds ---' % (time.time()-start_time))
Run Code Online (Sandbox Code Playgroud)
管道是否有大小限制,或者这是一个不可重现的问题?如果你把尺寸做得更大呢?如果有大小限制,避免这个问题并通过管道发送大字典的最佳方法是什么?提前致谢!
我正在编写一个程序,该程序生成一个进程并在某些条件下重新启动该进程。例如,如果子进程在一段时间内不再向母进程发送数据,我希望母进程终止子进程并重新启动它。我以为我可以使用线程从子进程接收数据并重新启动子进程,但它不像我想的那样工作。
import numpy as np
import multiprocessing as mp
import threading
import time
from apscheduler.schedulers.background import BackgroundScheduler
pipe_in, pipe_out = mp.Pipe()
class Mother():
def __init__(self):
self.pipe_out = pipe_out
self.proc = mp.Process(target = self.test_func, args=(pipe_in, ))
self.proc.start()
self.thread = threading.Thread(target=self.thread_reciever, args=(self.pipe_out, ))
self.thread.start()
def thread_reciever(self, pipe_out):
while True:
value = pipe_out.recv()
print(value)
if value == 5:
self.proc.terminate()
time.sleep(2)
self.proc = mp.Process(target = self.test_func)
self.proc.start()
def test_func(self, pipe_in):
for i in range(10):
pipe_in.send(i)
time.sleep(1)
if __name__ == '__main__':
r = Mother()
Run Code Online (Sandbox Code Playgroud)
它打印出这个错误。 …
python windows multithreading multiprocessing python-multiprocessing
我想用来multiprocessing.Pool加载大型数据集,这是我正在使用的代码:
import os
from os import listdir
import pickle
from os.path import join
import multiprocessing as mp
db_path = db_path
the_files = listdir(db_path)
fp_dict = {}
def loader(the_hash):
global fp_dict
the_file = join(db_path, the_hash)
with open(the_file, 'rb') as source:
fp_dict[the_hash] = pickle.load(source)
print(len(fp_dict))
def parallel(the_func, the_args):
global fp_dict
pool = mp.Pool(mp.cpu_count())
pool.map(the_func, the_args)
print(len(fp_dict))
parallel(loader, the_files)
Run Code Online (Sandbox Code Playgroud)
有趣的fp_dict是,当代码运行时, 的长度会发生变化。但是,只要进程终止,长度fp_dict就为零。为什么?如何使用 修改全局变量multiprocessing.Pool?
pyhton ProcessPoolExecutor 在公共行中工作,但在添加到函数后不运行
它是这样工作的
from concurrent import futures
def multi_process(func, paras, threads):
with futures.ProcessPoolExecutor(max_workers=threads) as pool:
res = pool.map(func, paras, chunksize=threads)
return list(res)
p = multi_process(func,paras,threads)
Run Code Online (Sandbox Code Playgroud)
但根本不工作,如下所示
def upper(paras,threads):
def func:
some func
def multi_process(func, paras, threads):
with futures.ProcessPoolExecutor(max_workers=threads) as pool:
res = pool.map(func, paras, chunksize=threads)
return list(res)
p = multi_process(func,paras,threads)
return p
p = upper(paras,threads)
Run Code Online (Sandbox Code Playgroud)
没有警告或错误,但很长时间没有任何反应。
multiprocessing ×10
python ×10
class ×1
csv ×1
dataframe ×1
dictionary ×1
pandas ×1
pipe ×1
python-2.7 ×1
windows ×1