ale*_*inn 3 python-3.x jupyter-notebook
我正在Jupyter 5上使用Python 3.6.1。我的目标是测试Portalocker如何管理在同一文件上的并发追加。
为此,我制作了一个简单的函数,该函数将单行追加到同一文件,并使用multiprocessing.Pool和Pool.map()并行运行该函数。
这是Jupyter笔记本中的代码。
单元1
from time import time
from multiprocessing import Pool
import portalocker
def f(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(f'{time()}\n')
Run Code Online (Sandbox Code Playgroud)
单元格2
start = time()
with Pool(4) as p:
p.map(f, range(4))
Run Code Online (Sandbox Code Playgroud)
单元格3
with open('portalocker_test.txt', 'r') as f:
for line in f:
print(line, end='')
Run Code Online (Sandbox Code Playgroud)
如果我运行此代码一次,便得到了预期的结果:
离开单元格3:
1495614277.189394
1495614277.1893928
1495614277.1893911
1495614277.1894028
Run Code Online (Sandbox Code Playgroud)
但是,如果我再次运行单元2(而不重新启动笔记本),则会得到:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-db9c07d32724> in <module>()
1 start = time()
2 with Pool(4) as p:
----> 3 p.map(f, range(4))
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
383 break
384 try:
--> 385 put(task)
386 except Exception as e:
387 job, ind = task[:2]
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: cannot serialize '_io.TextIOWrapper' object
Run Code Online (Sandbox Code Playgroud)
如果在运行单元2之前读取文件,则会引发相同的错误。因此,如果我从未在运行单元2之前打开文件,则一切正常。如果我以前打开过文件,则会收到该错误。这对我来说很不一致。到底是怎么回事?怎么解决呢?
同样,使用或不使用Portalocker都不会更改此行为,因此这也不是Portalocker的问题。我还没有在普通的python上检查它,但是我非常想在Jupyter上运行它。
问题是您应该避免为不同的对象使用相同的名称,在您的情况下应该有所帮助
将功能名称从更改f为function(或不同于的其他名称f)
单元1
from time import time
from multiprocessing import Pool
import portalocker
def function(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(f'{time()}\n')
Run Code Online (Sandbox Code Playgroud)
单元格2
start = time()
with Pool(4) as p:
p.map(function, range(4))
Run Code Online (Sandbox Code Playgroud)要么
重命名文件中的对象与所获得open从f到file(或从另一个名称不同f):
单元1
from time import time
from multiprocessing import Pool
import portalocker
def f(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as file:
portalocker.lock(file, portalocker.LOCK_EX)
file.write(f'{time()}\n')
Run Code Online (Sandbox Code Playgroud)
单元格3
with open('portalocker_test.txt', 'r') as file:
for line in file:
print(line, end='')
Run Code Online (Sandbox Code Playgroud)或两者
| 归档时间: |
|
| 查看次数: |
13483 次 |
| 最近记录: |