多处理 - 进程不会加入?

Har*_*ton 2 python multiprocessing

TL;DR - 消费者进程完成但不加入,不会引发错误并且脚本无限运行,在声明中陷入困境join

我的目标是加快数据检索过程,但我不知道可能有多少“任务”(要检索的数据片段)。所以我做了一个修改版本的毒丸方法,让任务识别出何时不再检索信息,并触发毒丸语句if

我已经发布了一个证明,这是我的毒丸方法的一个工作示例,以及一个完整的脚本,顾名思义,它是完整的脚本。(两者都应该能够按原样运行)

证明

import multiprocessing


class Task:
    def __init__(self, number):
        self.number = number

    def __call__(self):
        """Find officer and company data and combine and save it"""

        try:
            # 'gather some data!'
            self.result = self.number*2
            print(self.number)
            # 'fake' finding no data
            if self.result >= 8:
                raise NameError
        except NameError:
            # become poison pill once latest is done
            self.result = None

    def output(self):
        return self.result


class Consumer(multiprocessing.Process):
    """Handle process and re-queue complete tasks"""
    def __init__(self, waiting_queue, complete_queue):
        multiprocessing.Process.__init__(self)
        self.waiting_queue = waiting_queue
        self.complete_queue = complete_queue

    def run(self):
        """process tasks until queue is empty"""
        proc_name = self.name
        while True:
            current_task = self.waiting_queue.get()
            current_task()
            if current_task.output() is None:
                print('{}: Exiting, poison pill reached'.format(proc_name))
                self.waiting_queue.task_done()
                break
            self.waiting_queue.task_done()
            self.complete_queue.put(current_task)
        print('{}: complete'.format(proc_name))


class Shepard:
    """Handle life cycle of Consumers, Queues and Tasks"""
    def __init__(self):
        pass

    def __call__(self, start_point):

        # initialize queues
        todo = multiprocessing.JoinableQueue()
        finished = multiprocessing.JoinableQueue()

        # start consumers
        num_consumers = multiprocessing.cpu_count() * 2
        consumers = [Consumer(todo, finished) for i in range(num_consumers)]
        for q in consumers:
            q.start()

        # decide on (max) end limit (make much longer than suspected amount of data to be gathered
        start = int(start_point)
        max_record_range = 100
        end = start + max_record_range

        # Enqueue jobs
        for i in range(start, end):
            todo.put(Task(i))
        print('Processes joining')
        # wait for processes to join
        for p in consumers:
            p.join()
        print('Processes joined')

        # process results - UNFINISHED
        pass

        # return results - UNFINISHED
        return 'results!'


if __name__ == '__main__':

    # load start points:
    start_points = {'cat1': 1, 'cat2': 3, 'cat3': 4}


    master = Shepard()
    cat1 = master(start_points['cat1'])
    print('cat1 done')
    cat2 = master(start_points['cat2'])
    print('cat2 done')
    cat3 = master(start_points['cat3'])
Run Code Online (Sandbox Code Playgroud)

这是完整的脚本

import time
import requests
import sys
import json
import pandas as pd
import multiprocessing
import queue


class CompaniesHouseRequest:
    """Retreive information from Companies House"""
    def __init__(self, company, catagory_url=''):
        """Example URL: '/officers'"""
        self.company = str(company)
        self.catagory_url = str(catagory_url)

    def retrieve(self, key='Rn7RLDV9Tw9v4ShDCotjDtJFBgp1Lr4d-9GRYZMo'):
        """retrieve data from Companies House"""
        call = 'https://api.companieshouse.gov.uk/company/' + self.company + self.catagory_url
        retrieve_complete = False
        while retrieve_complete is False:
            resp = requests.get(call, auth=requests.auth.HTTPBasicAuth(key, ''))
            code = resp.status_code
            if code == 404:
                print(resp.status_code)
                raise NameError('Company not found')
            elif code == 200:
                try:
                    self.data = json.loads(resp.content.decode('UTF8'))
                    retrieve_complete = True
                except json.decoder.JSONDecodeError:
                    print('Decode Error in Officers!')
            else:
                print("Error:", sys.exc_info()[0])
                print('Retrying')
                time.sleep(5)
        return self.data


class Company:
    """Retrieve and hold company details"""
    def __init__(self, company_number):
        self.company_number = company_number

    def __call__(self):
        """Create request and process data"""
        # make request
        req = CompaniesHouseRequest(self.company_number)
        data = req.retrieve()
        # extract data
        try:
            line = [self.company_number,
                    data['company_name'],
                    data['registered_office_address'].get('premises', ''),
                    data['registered_office_address'].get('address_line_1', ''),
                    data['registered_office_address'].get('address_line_2', ''),
                    data['registered_office_address'].get('country', ''),
                    data['registered_office_address'].get('locality', ''),
                    data['registered_office_address'].get('postal_code', ''),
                    data['registered_office_address'].get('region', '')]
        except KeyError:
            line = ['' for i in range(0, 9)]
        # save as pandas dataframe
        return pd.DataFrame([line], columns=['company_number', 'company_name', 'company_address_premises',
                                             'company_address_line_1', 'company_address_line_2',
                                             'company_address_country', 'company_address_locality',
                                             'company_address_postcode', 'company_address_region'])


def name_splitter(name):
    split = name.split(', ')
    if len(split) > 2:
        return [split[2], split[1], split[0]]
    else:
        return ['', split[1], split[0]]


class Officers:
    """Retrieve and hold officers details"""
    def __init__(self, company_number):
        self.company_number = company_number

    def __call__(self):
        """Create request and process data"""
        # make request
        req = CompaniesHouseRequest(self.company_number, '/officers')
        data = req.retrieve()
        # extract data
        for officer in data['items']:
            if officer['officer_role'] == 'director':
                name = name_splitter(officer['name'])
                line = [name[0],
                        name[1],
                        name[2],
                        officer.get('occupation'),
                        officer.get('country_of_residence'),
                        officer.get('nationality'),
                        officer.get('appointed_on', ''),
                        officer['address'].get('premises', ''),
                        officer['address'].get('address_line_1', ''),
                        officer['address'].get('address_line_2', ''),
                        officer['address'].get('country', ''),
                        officer['address'].get('locality', ''),
                        officer['address'].get('postal_code', ''),
                        officer['address'].get('region', '')]
                break
        director_count = sum(map(lambda x: x['officer_role'] == 'director', data['items']))
        if director_count > 1:
            line += [True]
        elif director_count == 1:
            line += [False]
        else:
            line = ['no directors'] * 3 + [''] * 12
        return pd.DataFrame([line], columns=['title', 'first_name', 'surname', 'occupation', 'country_of_residence',
                                             'nationality', 'appointed_on',
                                             'address_premises', 'address_line_1', 'address_line_2',
                                             'address_country', 'address_locality', 'address_postcode',
                                             'address_region', 'multi_director'])


class Task:
    def __init__(self, prefix, company_number):
        self.prefix = prefix
        self.company_number = company_number

    def __call__(self):
        """Find officer and company data and combine and save it"""
        comp_id = self.prefix + str(self.company_number)
        print(comp_id)
        try:
            # initialise company class
            comp = Company(comp_id)
            # initialise officer class
            off = Officers(comp_id)
            # retrieve and concatonate
            self.result = pd.concat([comp(), off()], axis=1)

        except NameError:
            # become poison pill once latest is done
            self.result = None

    def output(self):
        return self.result


class Consumer(multiprocessing.Process):
    """Handle process and re-queue complete tasks"""
    def __init__(self, waiting_queue, complete_queue):
        multiprocessing.Process.__init__(self)
        self.waiting_queue = waiting_queue
        self.complete_queue = complete_queue

    def run(self):
        """process tasks until queue is empty"""
        proc_name = self.name
        while True:
            current_task = self.waiting_queue.get()
            current_task()
            if current_task.output() is None:
                print('{}: Exiting, poison pill reached'.format(proc_name))
                self.waiting_queue.task_done()
                break
            self.waiting_queue.task_done()
            self.complete_queue.put(current_task)
        print('{}: complete'.format(proc_name))


class Shepard:
    """Handle life of Consumers, Queues and Tasks"""
    def __init__(self):
        pass

    def __call__(self, prefix, start_point):

        # initialize queues
        todo = multiprocessing.JoinableQueue()
        finished = multiprocessing.JoinableQueue()

        # start consumers
        num_consumers = multiprocessing.cpu_count() * 2
        consumers = [Consumer(todo, finished) for i in range(num_consumers)]
        for q in consumers:
            q.start()

        # decide on (max) end limit
        start = int(start_point)
        max_record_range = 1000
        end = start + max_record_range

        # Enqueue jobs
        for i in range(start, end):
            todo.put(Task(prefix, i))
        print('Processes joining')

        # wait for processes to join
        for p in consumers:
            p.join()
        print('Processes joined')

        # process results - UNFINISHED
        pass

        # return results - UNFINISHED
        return 'results!'


if __name__ == '__main__':
    # paths to data
    data_directory = r'C:\Users\hdewinton\OneDrive - Advanced Payment Solutions\Python\Corporate DM\data'
    base = r'\base'

    # load start points:
    init = {"England": 10926071, "Scotland": 574309, "Ireland": 647561}

    # gather data for each catagory
    master = Shepard()
    ireland = master('NI', init['Ireland'])
    scotland = master('SC', init['Scotland'])
    england = master('', init['England'])
Run Code Online (Sandbox Code Playgroud)

Har*_*ton 6

TL;DR -后果(在消费者未能加入时陷入困境)可以通过更改以下内容来解决:

\n\n
finished = multiprocessing.JoinableQueue()\n
Run Code Online (Sandbox Code Playgroud)\n\n

对此:

\n\n
mananger = multiprocessing.Manager()\nfinished = mananger.Queue()\n
Run Code Online (Sandbox Code Playgroud)\n\n

详细信息- “当将对象放入队列时,该对象将被腌制,后台线程随后将腌制的数据刷新到底层管道。这会产生一些令人惊讶的后果,但不应造成任何实际困难 \xe2\ x80\x93 如果它们真的打扰您,那么您可以使用由管理器创建的队列。” 从文档中

\n\n

如果向第二个已完成项目队列添加一定数量的任务,则会触发上述令人惊讶的后果之一。低于限制不会出现问题,高于限制则会出现后果。这不会在虚拟中发生,因为第二个队列虽然存在,但并未使用。该限制取决于对象的大小和复杂性Task,因此我认为这与仅在达到一定数据量后才会刷新腌制数据有关 - 数据量会触发此结果

\n\n

附录- 实施修复后还会出现另一个错误:由于队列的使用者在todo队列为空之前终止,导致队列对象内的管道没有可发送数据的连接对象,因此会出现管道错误。这会触发一个WinError 232. 不过不用担心,可以通过在退出消费者之前清空队列来修复管道错误。\n只需将其添加到消费者类 run 方法中:

\n\n
while not self.waiting_queue.empty():\n            try:\n                self.waiting_queue.get(timeout=0.001)\n            except:\n                pass\n        self.waiting_queue.close()\n
Run Code Online (Sandbox Code Playgroud)\n\n

这将从队列中删除每个元素,确保其在主while循环之后并且不会发生管道错误,因为消费者将在终止之前清空意志队列。

\n

  • 这解决了我的问题。但为什么 mananger.Queue() 可以工作,而 Queue 却不能呢? (2认同)