TypeError:无法pickle _thread.lock对象

Jon*_*ell 25 python-3.x

尝试使用共享队列同时运行两个不同的函数并获得错误...如何使用共享队列同时运行两个函数?这是Windows 7上的Python 3.6版.

from multiprocessing import Process
from queue import Queue
import logging

def main():
    x = DataGenerator()
    try:
        x.run()
    except Exception as e:
        logging.exception("message")


class DataGenerator:

    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)

    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")

    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)

    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

日志表示错误,我尝试以管理员身份运行控制台,我得到相同的消息...

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
Run Code Online (Sandbox Code Playgroud)

小智 9

Pool()在Python 3.6.3中遇到了同样的问题。

收到错误: TypeError: can't pickle _thread.RLock objects

假设我们要向num_to_add某个列表的每个元素num_list并行添加一些数字。该代码示意性地是这样的:

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1 

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list)) 
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

    def run_parallel(self, num, shared_new_num_list):
        new_num = num + self.num_to_add # uses class parameter
        shared_new_num_list.append(new_num)
Run Code Online (Sandbox Code Playgroud)

这里的问题是selfin函数run_parallel()不能被腌制,因为它是一个类实例。将此并行化函数run_parallel()移出类很有帮助。但这不是最佳解决方案,因为此函数可能需要使用类参数self.num_to_add,然后必须将其作为参数传递。

解:

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()
Run Code Online (Sandbox Code Playgroud)

上面的其他建议对我没有帮助。


Pvd*_*vdL 7

multiprocessing.Pool-PicklingError:无法腌制<type'thread.lock'>:属性查找thread.lock失败

移动队列自我,而不是作为参数传递给你的函数packagesend

  • 这是一个糟糕的答案。没有适当的解释。另一个问题的链接。没有代码.. (4认同)
  • 移动队列是什么意思? (2认同)

小智 5

您需要更改from queue import Queuefrom multiprocessing import Queue

根本原因是前一个Queue是为线程模块Queue设计的,而后者是为multiprocessing.Process模块​​设计的。

有关详细信息,您可以阅读一些源代码或与我联系!