不一致的TypeError:无法序列化_io.TextIOWrapper对象

ale*_*inn 3 python-3.x jupyter-notebook

我正在Jupyter 5上使用Python 3.6.1。我的目标是测试Portalocker如何管理在同一文件上的并发追加。

为此,我制作了一个简单的函数,该函数将单行追加到同一文件,并使用multiprocessing.Pool和Pool.map()并行运行该函数。

这是Jupyter笔记本中的代码。

单元1

from time import time
from multiprocessing import Pool
import portalocker


def f(*args):
    while time() < start + 1:
        pass
    with open('portalocker_test.txt', 'a') as f:
        portalocker.lock(f, portalocker.LOCK_EX)
        f.write(f'{time()}\n')
Run Code Online (Sandbox Code Playgroud)

单元格2

start = time()
with Pool(4) as p:
    p.map(f, range(4))
Run Code Online (Sandbox Code Playgroud)

单元格3

with open('portalocker_test.txt', 'r') as f:
    for line in f:
        print(line, end='')
Run Code Online (Sandbox Code Playgroud)

如果我运行此代码一次,便得到了预期的结果:

离开单元格3:

1495614277.189394
1495614277.1893928
1495614277.1893911
1495614277.1894028
Run Code Online (Sandbox Code Playgroud)

但是,如果我再次运行单元2(而不重新启动笔记本),则会得到:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-db9c07d32724> in <module>()
      1 start = time()
      2 with Pool(4) as p:
----> 3     p.map(f, range(4))

/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):

/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):

/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    383                         break
    384                     try:
--> 385                         put(task)
    386                     except Exception as e:
    387                         job, ind = task[:2]

/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: cannot serialize '_io.TextIOWrapper' object
Run Code Online (Sandbox Code Playgroud)

如果在运行单元2之前读取文件,则会引发相同的错误。因此,如果我从未在运行单元2之前打开文件,则一切正常。如果我以前打开过文件,则会收到该错误。这对我来说很不一致。到底是怎么回事?怎么解决呢?

同样,使用或不使用Portalocker都不会更改此行为,因此这也不是Portalocker的问题。我还没有在普通的python上检查它,但是我非常想在Jupyter上运行它。

Aza*_*kov 5

问题是您应该避免为不同的对象使用相同的名称,在您的情况下应该有所帮助

  • 将功能名称从更改ffunction(或不同于的其他名称f

    单元1

    from time import time
    from multiprocessing import Pool
    import portalocker
    
    
    def function(*args):
        while time() < start + 1:
            pass
        with open('portalocker_test.txt', 'a') as f:
            portalocker.lock(f, portalocker.LOCK_EX)
            f.write(f'{time()}\n')
    
    Run Code Online (Sandbox Code Playgroud)

    单元格2

    start = time()
    with Pool(4) as p:
        p.map(function, range(4))
    
    Run Code Online (Sandbox Code Playgroud)

要么

  • 重命名文件中的对象与所获得openffile(或从另一个名称不同f):

    单元1

    from time import time
    from multiprocessing import Pool
    import portalocker
    
    
    def f(*args):
        while time() < start + 1:
            pass
        with open('portalocker_test.txt', 'a') as file:
            portalocker.lock(file, portalocker.LOCK_EX)
            file.write(f'{time()}\n')
    
    Run Code Online (Sandbox Code Playgroud)

    单元格3

    with open('portalocker_test.txt', 'r') as file:
        for line in file:
            print(line, end='')
    
    Run Code Online (Sandbox Code Playgroud)

或两者