Rob*_*ing 6 python multiprocessing python-3.x
我有以下python程序,该程序启动三个进程,每个进程都使用继承的文件句柄将10000个随机行写入同一文件:
import multiprocessing
import random
import string
import traceback
if __name__ == '__main__':
# clear out the file first
open('out.txt', 'w')
# initialise file handle to be inherited by sub-processes
file_handle = open('out.txt', 'a', newline='', encoding='utf-8')
process_count = 3
# routine to be run by sub-processes
# adds n lines to the file
def write_random_rows(n):
try:
letters = string.ascii_lowercase
for _ in range(n):
s = ''.join(random.choice(letters) for _ in range(100))
file_handle.write(s+"\n")
except Exception:
traceback.print_exc()
if __name__ == '__main__':
# initialise the multiprocessing pool
process_pool = multiprocessing.Pool(processes=process_count)
# write the rows
for i in range(process_count):
process_pool.apply_async(write_random_rows, (10000,))
# write_random_rows(10000)
# wait for the sub-processes to finish
process_pool.close()
process_pool.join()
Run Code Online (Sandbox Code Playgroud)
运行此程序的结果是,我希望文件包含30000行。如果我write_random_rows(10000)
在主循环中运行(上述程序中的注释行),则按预期将30000行写入文件。但是,如果我运行非注释行,则文件中process_pool.apply_async(write_random_rows, (10000,))
最终将有15498行。
奇怪的是,无论我重新运行此脚本多少次,我在输出文件中总会得到相同(不正确)的行数。
我可以通过从内部初始化文件句柄write_random_rows()
(即在子流程执行内部)来解决此问题,这表明继承的文件句柄以某种方式相互干扰。如果它与某种竞争条件有关,我希望每次运行脚本时行数都会改变。为什么会发生此问题?
此问题是由于以下原因引起的:
分叉过程导致父级和子级共享 posix文件描述符。在原始写入的情况下这应该不会导致数据丢失,但没有任何形式的父母和孩子总是导致炒交织数据之间的同步。
但是,在进程进行独立缓冲的情况下,数据可能会丢失,具体取决于实现缓冲写入的方式。
因此,在这种情况下,一个有用的实验将涉及在不涉及缓冲的情况下复制您的问题。这可以通过两种方式完成:
使用open(..., mode='ab', buffering=0)
......然后,因为这是一个二进制文件,确保所有写入编码来bytes
使用
file_handle.write(bytes(s+"\n", encoding="utf-8"))
Run Code Online (Sandbox Code Playgroud)
这样做会导致文件包含30,000行,大小为3030000字节(按预期)
跳过一些障碍,以io.TextIOWrapper
使用非默认选项打开文件来禁用缓冲。我们无法控制所需的标志,open
因此将其创建为:
file_handle = io.TextIOWrapper(
io.BufferedWriter(
io.FileIO("out.txt", mode="a"),
buffer_size=1),
newline='', encoding="utf-8",
write_through=True)
Run Code Online (Sandbox Code Playgroud)
这也将导致文件30,000行,大小为3030000字节(按预期)
正如评论者所指出的那样,在Python 3.7上,原始代码生成的文件包含29,766行,而不是30,000行。每个工人这是78行短。由两名工作人员运行该代码将生成一个包含19,844行的文件(每个工作人员也短缺78行)。
为什么?通常的做法是使用退出一个分叉的子进程os._exit
,这似乎并不是将每个子进程中的剩余缓冲区刷新到磁盘上……这正好解释了每个子进程缺少的78行。
io.DEFAULT_BUFFER_SIZE
)为8192字节。ceil(8192 / 101) = 82
行刷新一次。也就是说,81行将几乎充满缓冲区,而第82行将导致前面的81行及其自身被刷新。10,000 % 82 = 78
,每个子代的缓冲区中都有剩余行。因此,它会出现丢失的数据是具有缓冲的数据没有被刷新。因此,进行以下更改:
def write_random_rows(n):
...
except Exception:
traceback.print_exc()
# flush the file
file_handle.flush()
Run Code Online (Sandbox Code Playgroud)
将产生所需的30,000行。
注意:
在这两种情况下,通过延迟对子进程的打开或跨派生任何打开的文件句柄来确保子进程不共享文件句柄几乎总是更好的选择dup
。