标签: multiprocessing

python:使用多处理时访问变量的问题

我是 python 中多处理概念的新手,当我尝试在我的代码中包含多处理时,我在访问变量时遇到了问题。对不起,如果我听起来很天真,但我就是想不通。下面是我的场景的一个简单版本。

class Data:
    def __init__(self):
        self.data = "data"
    def datameth(self):
        print self.data
        print mainvar

class First:
    def __init__(self):
        self.first = "first"
    def firstmeth(self):
        d = Data()
        d.datameth()
        print self.first

def mymethod():
    f = First()
    f.firstmeth()

if __name__ == '__main__':
    mainvar = "mainvar"
    mymethod()
Run Code Online (Sandbox Code Playgroud)

当我运行它时,它运行良好并给出输出:

data
mainvar
first
Run Code Online (Sandbox Code Playgroud)

但是当我尝试mymethod()作为一个进程运行时

from multiprocessing import Process
class Data:
    def __init__(self):
        self.data = "data"
    def datameth(self):
        print self.data
        print mainvar

class First:
    def __init__(self):
        self.first = "first"
    def firstmeth(self):
        d = …
Run Code Online (Sandbox Code Playgroud)

python class multiprocessing python-2.7 python-multiprocessing

1
推荐指数
1
解决办法
1040
查看次数

how to write content of df into csv file using multiprocessing in python

I have a function that writes the content of df into csv file.

def writeToCSV(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
    headers = []
    fid = open(defFile, 'r')
    for line in fid:
        headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
    df = pd.DataFrame([], columns=headers)
    for header in outDf.columns.values:
        if header in headers:
            df[header] = outDf[header]

    df.to_csv(toFile, sep=delim, quotechar=quotechar, index=False, encoding='utf-8')
Run Code Online (Sandbox Code Playgroud)

How can i parallelize this process? Currently i am using following code

def writeToSchemaParallel(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
    logInfo('Start writingtoSchema in parallel...', 'track')
    headers = []
    fid …
Run Code Online (Sandbox Code Playgroud)

python csv multiprocessing dataframe pandas

1
推荐指数
1
解决办法
3734
查看次数

如何在python的多处理中跨进程共享大型只读字典/列表?

我有一个 18Gb 的 pickle 文件,我需要跨进程访问它。我尝试使用

from multiprocessing import Manager
import cPickle as pkl
manager = Manager()
data = manager.dict(pkl.load(open("xyz.pkl","rb")))
Run Code Online (Sandbox Code Playgroud)

但是,我遇到以下问题:

IOError: [Errno 11] Resource temporarily unavailable 
Run Code Online (Sandbox Code Playgroud)

有人建议这可能是因为套接字超时,但它似乎不是因为增加超时没有帮助。我该怎么做。还有其他有效的跨进程共享数据的方法吗?

python multithreading multiprocessing

1
推荐指数
1
解决办法
1095
查看次数

如何使用多处理来循环浏览一个大的 URL 列表?

问题:检查超过 1000 个 url 的列表并获取 url 返回码 (status_code)。

我的脚本有效,但速度很慢。

我认为必须有一种更好的、pythonic(更漂亮)的方式来做到这一点,在那里我可以产生 10 或 20 个线程来检查 url 并收集响应。(IE:

200 -> www.yahoo.com
404 -> www.badurl.com
...
Run Code Online (Sandbox Code Playgroud)

输入文件:Url10.txt

www.example.com
www.yahoo.com
www.testsite.com
Run Code Online (Sandbox Code Playgroud)

....

import requests

with open("url10.txt") as f:
    urls = f.read().splitlines()

print(urls)
for url in urls:
    url =  'http://'+url   #Add http:// to each url (there has to be a better way to do this)
    try:
        resp = requests.get(url, timeout=1)
        print(len(resp.content), '->', resp.status_code, '->', resp.url)
    except Exception as e:
        print("Error", url)
Run Code Online (Sandbox Code Playgroud)

挑战: 通过多处理提高速度。


多处理

但它不工作。我收到以下错误:(注意:我不确定我是否正确实现了这一点) …

python multithreading multiprocessing python-multiprocessing

1
推荐指数
1
解决办法
4148
查看次数

Python 等待多处理池中的进程完成而不关闭池或使用 map()

我有一个像下面这样的代码段

pool = multiprocessing.Pool(10)
for i in range(300):
    for m in range(500):
        data = do_some_calculation(resource)
        pool.apply_async(paralized_func, data, call_back=update_resource)
    # need to wait for all processes finish
    # {...}
    # Summarize resource
    do_something_with_resource(resource)
Run Code Online (Sandbox Code Playgroud)

所以基本上我有2个循环。我在循环外初始化进程池以避免过热。在第二个循环结束时,我想总结所有过程的结果。

问题是我不能pool.map()因为data输入的变化而等待。我不能使用pool.join()andpool.close()或者因为我仍然需要pool在第一个循环的下一次迭代中使用。

在这种情况下等待进程完成的好方法是什么?

我尝试在第二个循环结束时检查 pool._cache。

while len(process_pool._cache) > 0:
    sleep(0.001)
Run Code Online (Sandbox Code Playgroud)

这种方式有效,但看起来很奇怪。有一个更好的方法吗?

python multiprocessing

1
推荐指数
1
解决办法
9528
查看次数

如何同时运行多个python文件?

如何同时运行多个python文件?有 3 个文件:bot_1.py、bot_2.py、bot_3.py。我想同时运行它们。我附上了代码。我应该在工作函数中写什么来使这个脚本工作?我将不胜感激任何帮助。

import multiprocessing
import subprocess
def worker(file):
    #your subprocess code
    subprocess.Popen(['screen', './bot_1.py'])
    subprocess.Popen(['screen', './bot_2.py'])
    subprocess.Popen(['screen', './bot_3.py'])

if __name__ == '__main__':
    files = ["bot_1.py","bot_2.py","bot_3.py"]
    for i in files:
        p = multiprocessing.Process(target=worker(i))
        p.start()
Run Code Online (Sandbox Code Playgroud)

python multiprocessing

1
推荐指数
1
解决办法
9891
查看次数

带有大对象的 Python 多处理管道将挂起

我在下面有一个简单的代码片段来演示这个问题。

from multiprocessing import Pipe
import time

recv_end, send_end = Pipe(duplex=False)
d = {'word'+str(elem): elem for elem in range(3000)}

start_time = time.time()
send_end.send(d)
print('--- %s seconds ---' % (time.time()-start_time))
Run Code Online (Sandbox Code Playgroud)

以上工作正常,对我的目的来说足够快,没问题。但是如果我将大小设置为 5000,它就会无限期地挂起:

from multiprocessing import Pipe
import time

recv_end, send_end = Pipe(duplex=False)
d = {'word'+str(elem): elem for elem in range(5000)}  # changed to 5000

start_time = time.time()
send_end.send(d)
print('--- %s seconds ---' % (time.time()-start_time))
Run Code Online (Sandbox Code Playgroud)

管道是否有大小限制,或者这是一个不可重现的问题?如果你把尺寸做得更大呢?如果有大小限制,避免这个问题并通过管道发送大字典的最佳方法是什么?提前致谢!

python dictionary pipe multiprocessing

1
推荐指数
1
解决办法
1629
查看次数

是否可以在 Python 的线程中生成一个进程?

我正在编写一个程序,该程序生成一个进程并在某些条件下重新启动该进程。例如,如果子进程在一段时间内不再向母进程发送数据,我希望母进程终止子进程并重新启动它。我以为我可以使用线程从子进程接收数据并重新启动子进程,但它不像我想的那样工作。

import numpy as np
import multiprocessing as mp
import threading
import time
from apscheduler.schedulers.background import BackgroundScheduler

pipe_in, pipe_out = mp.Pipe()

class Mother():
    def __init__(self):
        self.pipe_out = pipe_out

        self.proc = mp.Process(target = self.test_func, args=(pipe_in, ))
        self.proc.start()

        self.thread = threading.Thread(target=self.thread_reciever, args=(self.pipe_out, ))
        self.thread.start()

    def thread_reciever(self, pipe_out):
        while True:
            value = pipe_out.recv()

            print(value)
            if value == 5:
                self.proc.terminate()
                time.sleep(2)
                self.proc = mp.Process(target = self.test_func)
                self.proc.start()

    def test_func(self, pipe_in):
        for i in range(10):
            pipe_in.send(i)
            time.sleep(1)


if __name__ == '__main__':
    r = Mother()
Run Code Online (Sandbox Code Playgroud)

它打印出这个错误。 …

python windows multithreading multiprocessing python-multiprocessing

1
推荐指数
1
解决办法
2478
查看次数

为什么 multiprocessing.Pool 不能改变全局变量?

我想用来multiprocessing.Pool加载大型数据集,这是我正在使用的代码:

import os
from os import listdir
import pickle
from os.path import join
import multiprocessing as mp

db_path = db_path
the_files = listdir(db_path)
fp_dict = {}
def loader(the_hash):
        global fp_dict
        the_file = join(db_path, the_hash)
        with open(the_file, 'rb') as source:
                fp_dict[the_hash] = pickle.load(source)
        print(len(fp_dict))
def parallel(the_func, the_args):
        global fp_dict
        pool = mp.Pool(mp.cpu_count())
        pool.map(the_func, the_args)
        print(len(fp_dict))
parallel(loader, the_files)
Run Code Online (Sandbox Code Playgroud)

有趣的fp_dict是,当代码运行时, 的长度会发生变化。但是,只要进程终止,长度fp_dict就为零。为什么?如何使用 修改全局变量multiprocessing.Pool

python multiprocessing

1
推荐指数
1
解决办法
1308
查看次数

python ProcessPoolExecutor 在函数中不起作用

pyhton ProcessPoolExecutor 在公共行中工作,但在添加到函数后不运行

它是这样工作的

from concurrent import futures

def multi_process(func, paras, threads):
    with futures.ProcessPoolExecutor(max_workers=threads) as pool:
        res = pool.map(func, paras, chunksize=threads)
    return list(res)
p = multi_process(func,paras,threads)
Run Code Online (Sandbox Code Playgroud)

但根本不工作,如下所示

def upper(paras,threads):
    def func:
        some func
    def multi_process(func, paras, threads):
        with futures.ProcessPoolExecutor(max_workers=threads) as pool:
            res = pool.map(func, paras, chunksize=threads)
        return list(res)
    p = multi_process(func,paras,threads)
    return p
p = upper(paras,threads)
Run Code Online (Sandbox Code Playgroud)

没有警告或错误,但很长时间没有任何反应。

python multiprocessing

1
推荐指数
1
解决办法
1992
查看次数