"在Windows上使用多处理时,无法选择<type'_csv.reader'>"错误

Ron*_*Ron 5 python

我正在编写一个多处理程序来使用Windows并行处理大型.CSV文件.

我找到了一个类似问题的优秀例子.在Windows下运行时,我收到一个错误,即csv.reader不是Picklable.

我想我可以在阅读器子流程中打开CSV文件,只需从父进程发送文件名即可.但是,我想传递一个已打开的CSV文件(就像代码应该这样做),具有特定的状态,即真正使用共享对象.

任何想法如何在Windows下或那里缺少什么?

这是代码(我为了便于阅读而重新发布):

"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input thread sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across threads so open/close
        # and use it all in the same thread or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])
Run Code Online (Sandbox Code Playgroud)

在Windows下运行时,这是我收到的错误:

Traceback (most recent call last):
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 130, in <module>
    main(sys.argv[1:])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 127, in main
    c = CSVWorker(opts.numprocs, args[0], args[1])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 44, in __init__
    self.pin.start()
  File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Python27\lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Python27\lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Python27\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 401, in save_reduce
    save(args)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 548, in save_tuple
    save(element)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 396, in save_reduce
    save(cls)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 753, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError
Run Code Online (Sandbox Code Playgroud)

Sin*_*ion 5

您遇到的问题是由于使用 CSVWorker 类的方法作为进程目标引起的;并且该类具有无法腌制的成员;那些打开的文件永远不会工作;

你想要做的是把那个类分成两个类;一个协调所有工作子进程,另一个实际执行计算工作。工作进程将文件名作为参数并根据需要打开单个文件,或者至少等到他们调用了他们的工作方法并在那时才打开文件。它们也可以将multiprocessing.Queues 作为参数或实例成员;可以安全地通过。

在某种程度上,你已经有点这样做了;您的write_output_csv方法正在子进程中打开它的文件,但您的parse_input_csv方法期望找到一个已经打开并准备好的文件作为self. 坚持以另一种方式做,你应该处于良好的状态。