我multi processing在Python中使用。以下是我的代码的演示:
在功能上main:
from multiprocessing import Process
def __name__ == "__main__":
print "Main program starts here."
SOME CODE....
process_1 = Process(target=proc1, args = (arg1, arg2))
process_2 = Process(target=proc2, args = (arg3, arg4))
process_1.start()
process_2.start()
process_1.join()
process_2.join()
Run Code Online (Sandbox Code Playgroud)
在函数proc1和中proc2:
def proc1(arg1, arg2):
print "Proc1 starts from here."
SOME CODE....
Run Code Online (Sandbox Code Playgroud)
所以我期望看到的输出是:
主程序从这里开始。
Proc1从这里开始。
Proc2从这里开始。
然而,我得到的是:
主程序从这里开始。
主程序从这里开始。
主程序从这里开始。
看来 和 都proc1启动proc2了main而不是进程。
我可以知道我的代码有什么问题吗?
非常感谢。
有没有可能使用多处理接口来加速我的代码?问题是这个接口使用了map函数,它只适用于1个函数。但我的代码有3个功能。我尝试将我的功能合并为一个,但没有成功。我的脚本从文件中读取站点的 URL 并对其执行 3 个功能。For 循环使它非常慢,因为我有很多 URL
import requests
def Login(url): #Log in
payload = {
'UserName_Text' : 'user',
'UserPW_Password' : 'pass',
'submit_ButtonOK' : 'return buttonClick;'
}
try:
p = session.post(url+'/login.jsp', data = payload, timeout=10)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
print "site is DOWN! :", url[8:]
session.cookies.clear()
session.close()
else:
print 'OK: ', p.url
def Timer(url): #Measure request time
try:
timer = requests.get(url+'/login.jsp').elapsed.total_seconds()
except (requests.exceptions.ConnectionError):
print 'Request time: None'
print '-----------------------------------------------------------------'
else:
print 'Request time:', round(timer, 2), 'sec'
def Logout(url): # Log out …Run Code Online (Sandbox Code Playgroud) python multithreading python-2.7 python-requests python-multiprocessing
我有可以在 Windows 上运行的 Python 代码,但是在 Linux 上运行时它就会挂起。我正在使用 JPype,因此我怀疑多个共享进程尝试使用同一管道访问 Java 可能存在一些问题(创建了不同的进程,但挂在 JPype 行)。有没有办法强制在 Pathos 中生成以复制 Windows 实现?(例如常规多处理库中的 set_start_method 或 get_context ?)
谢谢。
我在任何地方看到 python 的共享内存实现(例如在 参考资料中multiprocessing),创建共享内存总是分配新内存。有没有办法创建共享内存对象并让它引用现有内存?目的是预先初始化数据值,或者更确切地说,如果我们已经有一个数组,则可以避免复制到新的共享内存中。根据我的经验,分配大型共享数组比将值复制到其中要快得多。
你好呀,
我偶然发现了一个问题ProcessPoolExecutor,进程访问数据时,它们不应该能够。让我解释:
我遇到的情况类似于下面的示例:我进行了多次运行,每次都以不同的参数开始。他们并行计算自己的东西,没有理由互相交互。现在,据我了解,当一个进程分叉时,它会复制自身。子进程与其父进程具有相同的(内存)数据,但如果它更改任何内容,它会在自己的副本上进行更改。如果我希望更改在子进程的生命周期内持续存在,我会调用队列、管道和其他 IPC 内容。
但其实我不知道!每个进程都为自己操作数据,这些数据不应传递到任何其他运行。不过,下面的示例显示了不同的情况。下一次运行(不是并行运行的运行)可以访问上一次运行的数据,这意味着数据尚未从进程中清除。
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method
class Static:
integer: int = 0
def inprocess(run: int) -> None:
cp = current_process()
# Print current state
print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)
# Check value
if Static.integer != 0:
raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
# Update value
Static.integer = run + 1
def pooling():
cp = current_process()
# Get master's pid
print(f"[{cp.pid} {cp.name}] Start")
with ProcessPoolExecutor(max_workers=2) as …Run Code Online (Sandbox Code Playgroud) 我是多处理概念的新手。
from multiprocessing import Process
def square(x):
for x in numbers:
print('%s squared is %s' % (x, x**2))
if __name__ == '__main__':
numbers = [43, 50, 5, 98, 34, 35]
p = Process(target=square, args=('x',))
p.start()
p.join
print "Done"
Run Code Online (Sandbox Code Playgroud)
Done
43 squared is 1849
50 squared is 2500
5 squared is 25
98 squared is 9604
34 squared is 1156
35 squared is 1225
Run Code Online (Sandbox Code Playgroud)
我明白了,我们可以用来multiprocessing.cpu_count()获取系统中的CPU数量
然而,我未能实现两件感兴趣的事情。-
python parallel-processing multiprocessing python-multiprocessing
在我的代码中,我需要在 python 程序中运行多个工作线程实例。我最初创建了几个工作线程实例(比如 10 个),然后将它们添加到池中。每当客户端请求服务时,就应该调用并为客户端保留一个线程。完成任务后,线程应添加回池中。
到目前为止我已经编写了以下代码。但我不确定如何在池中永远运行线程(它们应该在池内休眠),在需要时调用并获取服务,并在处理后将它们添加回池中(应该再次休眠)。任何帮助,将不胜感激。
PRED = Queue(10)
class Worker(threading.Thread):
def __init__(self, threadID, name):
threading.Thread.__init__(self)
self.threadID =threadID
self.name = name
def run(self):
print("starting " + self.name + " thread")
while True:
??
print("Exiting " + self.name + " thread")
def work():
print("working")
time.sleep(3)
Run Code Online (Sandbox Code Playgroud)
如何使用所有 cpu 内核进行 asyncio - 除了 ProcessPoolExecutor 之外的任何其他选项?
我认为 asyncio 无法打破 GIL 限制(也许我错了),因此程序的执行速度将比踩踏版本快,但会在一个核心上执行。
我研究了一些例子,我发现一种方法是多处理和 ProcessPoolExecutor。
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
Run Code Online (Sandbox Code Playgroud)
这很好,但需要在进程之间进行“pickle”,因此需要一些开销并对传递的参数进行一些优化以减少“pickle”序列化。
使用上面这个简单的模式,我写了这样的测试代码(如果你不喜欢它,你可以跳过这段代码阅读,因为它和以前一样)。顺便说一句,这是我解析文件问题的最快解决方案。这部分代码不是整个程序。
def _match_general_and_specific_file_chunk(file_name):
with codecs.open(file_name, encoding='utf8') as f:
while True:
lines = f.readlines(sizehint=10000)
if not lines:
break
for line in lines:
general_match = RE_RULES.match(line)
if general_match:
specific_match = RULES[general_match.lastindex].match(line)
groups = list(specific_match.groups())
continue
async def _async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 python 将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带有锁的多处理来从多个进程并行写入新的 gzip,但我在输出 gz 文件上收到无效格式错误。
我认为这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据,以便将正确的条目写入存档中,因此我认为 python 默认情况下无法处理此问题。我猜想每个进程都会保持自己对 gzip 输出的感知,并且这种状态在第一次写入后会有所不同。
如果我在不使用 gzip 的情况下打开脚本中的目标文件,那么这一切都有效。我还可以写入多个 gzip 并将它们合并,但如果可能的话更愿意避免这种情况。
这是我的源代码:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock) …Run Code Online (Sandbox Code Playgroud) 如何与另一个流程共享一个流程的价值?显然我可以通过多线程而不是多处理来做到这一点。多线程对于我的程序来说很慢。
我无法显示我的确切代码,所以我做了这个简单的例子。
from multiprocessing import Process
from threading import Thread
import time
class exp:
def __init__(self):
self.var1 = 0
def func1(self):
self.var1 = 5
print(self.var1)
def func2(self):
print(self.var1)
if __name__ == "__main__":
#multithreading
obj1 = exp()
t1 = Thread(target = obj1.func1)
t2 = Thread(target = obj1.func2)
print("multithreading")
t1.start()
time.sleep(1)
t2.start()
time.sleep(3)
#multiprocessing
obj = exp()
p1 = Process(target = obj.func1)
p2 = Process(target = obj.func2)
print("multiprocessing")
p1.start()
time.sleep(2)
p2.start()
Run Code Online (Sandbox Code Playgroud)
预期输出:
from multiprocessing import Process
from threading import Thread
import …Run Code Online (Sandbox Code Playgroud) python multiprocessing python-multithreading python-3.x python-multiprocessing
python ×8
python-3.x ×4
compression ×1
gzip ×1
linux ×1
pathos ×1
python-2.7 ×1
python-3.5 ×1
spawn ×1