Fom*_*aut 9 python multithreading multiprocessing python-3.x
假设我有一个非常大的文本文件,包含许多我想要反转的行.而且我不关心最终的订单.输入文件包含西里尔符号.我multiprocessing用来处理几个核心.
我写了这样的程序:
# task.py
import multiprocessing as mp
POOL_NUMBER = 2
lock_read = mp.Lock()
lock_write = mp.Lock()
fi = open('input.txt', 'r')
fo = open('output.txt', 'w')
def handle(line):
# In the future I want to do
# some more complicated operations over the line
return line.strip()[::-1] # Reversing
def target():
while True:
try:
with lock_read:
line = next(fi)
except StopIteration:
break
line = handle(line)
with lock_write:
print(line, file=fo)
pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
p.start()
for p in pool:
p.join()
fi.close()
fo.close()
Run Code Online (Sandbox Code Playgroud)
该程序失败并出现错误:
Process Process-2:
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte
Run Code Online (Sandbox Code Playgroud)
另一方面,如果我设置,一切正常POOL_NUMBER = 1.但如果我想获得总体性能,那就没有意义了.
为什么会发生这种错误?我该如何解决?
我用Python 3.5.2.
我使用这个脚本生成数据:
# gen_file.py
from random import randint
LENGTH = 100
SIZE = 100000
def gen_word(length):
return ''.join(
chr(randint(ord('?'), ord('?')))
for _ in range(length)
)
if __name__ == "__main__":
with open('input.txt', 'w') as f:
for _ in range(SIZE):
print(gen_word(LENGTH), file=f)
Run Code Online (Sandbox Code Playgroud)
这里的问题是从多进程读取文件并不像您想象的那样工作,您无法open在进程之间共享对象。
您可以创建一个全局current_line变量,每次读取文件并处理当前行,这并不理想。
这是一种不同的方法,使用进程池和map方法,我正在迭代文件,并且对于每一行,我将您的目标方法排入队列:
from multiprocessing import Lock
from multiprocessing import Pool
import time
import os
POOL_NUMBER = 8
def target(line):
# Really need some processing here
for _ in range(2**10):
pass
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
t0 = time.time()
processed_lines = pool.map(target, fi.readlines())
print('Total time', time.time() - t0)
with open('output.txt', 'w') as fo:
for processed_line in processed_lines:
fo.writelines(processed_line)
Run Code Online (Sandbox Code Playgroud)
我的机器上有 8 个进程:
Total time 1.3367934226989746
并有 1 个进程:
Total time 4.324501991271973
如果您的目标函数受 CPU 限制,这种方法效果最好,另一种方法是将文件分割成POOL_NUMBER块,并使每个进程将处理后的数据块(带锁!)写入输出文件。
另一种方法是创建一个主进程,为其余进程执行写入作业,这是一个示例。
编辑
在您发表评论后,我认为您无法将该文件放入内存中。为此,您只需迭代文件对象即可将其逐行读入内存。但比我们需要把代码修改大一点:
POOL_NUMBER = 8
CHUNK_SIZE = 50000
def target(line):
# This is not a measurable task, since most of the time wil spent on writing the data
# if you have a CPU bound task, this code will make sense
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
processed_lines = []
with open('input.txt', 'r') as fi:
t0 = time.time()
for line in fi:
processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't
if len(processed_lines) == CHUNK_SIZE:
with open('output.txt', 'w') as fo: # reading the file line by line
for processed_line in processed_lines:
fo.writelines(processed_line.get())
processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
print('Total time', time.time() - t0)
Run Code Online (Sandbox Code Playgroud)
请记住,您可以使用CHUNK_SIZE 变量来控制使用的内存量。对我来说,5000 大约是每个进程最多 10K。
聚苯乙烯
我认为最好将大文件拆分成较小的文件,这样您就可以解决文件上的读/写锁定,并且还可以使其可扩展以进行处理(即使在不同的机器上!)
| 归档时间: |
|
| 查看次数: |
486 次 |
| 最近记录: |