为什么multiprocessing.Lock()没有锁定Python中的共享资源?

Fom*_*aut 9 python multithreading multiprocessing python-3.x

假设我有一个非常大的文本文件,包含许多我想要反转的行.而且我不关心最终的订单.输入文件包含西里尔符号.我multiprocessing用来处理几个核心.

我写了这样的程序:

# task.py

import multiprocessing as mp


POOL_NUMBER = 2


lock_read = mp.Lock()
lock_write = mp.Lock()

fi = open('input.txt', 'r')
fo = open('output.txt', 'w')

def handle(line):
    # In the future I want to do
    # some more complicated operations over the line
    return line.strip()[::-1]  # Reversing

def target():
    while True:
        try:
            with lock_read:
                line = next(fi)
        except StopIteration:
            break

        line = handle(line)

        with lock_write:
            print(line, file=fo)

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
    p.start()
for p in pool:
    p.join()

fi.close()
fo.close()
Run Code Online (Sandbox Code Playgroud)

该程序失败并出现错误:

Process Process-2:
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte
Run Code Online (Sandbox Code Playgroud)

另一方面,如果我设置,一切正常POOL_NUMBER = 1.但如果我想获得总体性能,那就没有意义了.

为什么会发生这种错误?我该如何解决?

我用Python 3.5.2.

我使用这个脚本生成数据:

# gen_file.py

from random import randint


LENGTH = 100
SIZE = 100000


def gen_word(length):
    return ''.join(
        chr(randint(ord('?'), ord('?')))
        for _ in range(length)
    )


if __name__ == "__main__":
    with open('input.txt', 'w') as f:
        for _ in range(SIZE):
            print(gen_word(LENGTH), file=f)
Run Code Online (Sandbox Code Playgroud)

Or *_*uan 4

这里的问题是从多进程读取文件并不像您想象的那样工作,您无法open在进程之间共享对象。

您可以创建一个全局current_line变量,每次读取文件并处理当前行,这并不理想。

这是一种不同的方法,使用进程池和map方法,我正在迭代文件,并且对于每一行,我将您的目标方法排入队列:

from multiprocessing import Lock
from multiprocessing import Pool
import time
import os

POOL_NUMBER = 8

def target(line):
    # Really need some processing here
    for _ in range(2**10):
        pass
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
    t0 = time.time()
    processed_lines = pool.map(target, fi.readlines())
    print('Total time', time.time() - t0)

    with open('output.txt', 'w') as fo:
        for processed_line in processed_lines:
            fo.writelines(processed_line)
Run Code Online (Sandbox Code Playgroud)

我的机器上有 8 个进程: Total time 1.3367934226989746

并有 1 个进程: Total time 4.324501991271973

如果您的目标函数受 CPU 限制,这种方法效果最好,另一种方法是将文件分割成POOL_NUMBER块,并使每个进程将处理后的数据块(带锁!)写入输出文件。

另一种方法是创建一个主进程,为其余进程执行写入作业,是一个示例。

编辑

在您发表评论后,我认为您无法将该文件放入内存中。为此,您只需迭代文件对象即可将其逐行读入内存。但比我们需要把代码修改大一点:

POOL_NUMBER = 8
CHUNK_SIZE = 50000

def target(line):
    # This is not a measurable task, since most of the time wil spent on writing the data
    # if you have a CPU bound task, this code will make sense
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
processed_lines = []

with open('input.txt', 'r') as fi:
    t0 = time.time()
    for line in fi:
        processed_lines.append(pool.apply_async(target, (line,)))  # Keep a refernce to this task, but don't 

        if len(processed_lines) == CHUNK_SIZE:
            with open('output.txt', 'w') as fo:  # reading the file line by line
                for processed_line in processed_lines:
                    fo.writelines(processed_line.get())
            processed_lines = []  # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
    print('Total time', time.time() - t0)
Run Code Online (Sandbox Code Playgroud)

请记住,您可以使用CHUNK_SIZE 变量来控制使用的内存量。对我来说,5000 大约是每个进程最多 10K。

聚苯乙烯

我认为最好将大文件拆分成较小的文件,这样您就可以解决文件上的读/写锁定,并且还可以使其可扩展以进行处理(即使在不同的机器上!)