使用共享内存复杂对象多处理大型XML文件

Bra*_*roy 7 python parallel-processing shared-memory python-3.x python-multiprocessing

我正在改进一个解析XML并对其子树进行分类和索引的程序.实际程序太大而无法显示在这里,所以我把它归结为一个最小的测试用例,显示我遇到的问题.

这个想法是:

  1. 逐个处理目录中的XML文件
  2. alpino_ds并行处理文件中的所有节点
  3. 在此过程中,该进程需要对共享变量进行读/写访问,以便我们可以检查属性总共发生了多少次,或者跟踪文件句柄

请注意,在实际代码中还有一些注意事项:

  • simply returning new values per process and then merging them in the main thread seems not advisable and presumably quite slow because the actual data structure are dicts of four levels deep consisting of dicts, sets, ints, and strings, as well as dict-to-filehandle, and Counter() objects;
  • I tried using threads (with ThreadPoolExecutor) and even though there was some gain (I calculated around 5% improvement in speed), this is not good enough for me;
  • the actual data I am working with can consist of XML files of more than 60GB, or up to 15 million alpino_ds tags per file. That is the main reason I want to run things in parallel - there is just so much data. That means that the nested objects get quite big as well, so merging/sharing these objects between processes may be a bottleneck in itself.

Example code:

from pathlib import Path
from collections import Counter
from copy import copy
from lxml import etree

import concurrent.futures


class XmlGrinder:
    def __init__(self, m=1):
        if m is False:
            self.m = 1
        elif m == 0:
            self.m = None
        else:
            self.m = m

        self.max_a = 7
        self.max_b = 1000

        self.pdin = self.pdout = None
        self.pattern_counter = self.fhs = self.corpus = None

    def grind(self, din, dout):
        self.pdin = Path(din)
        self.pdout = Path(dout)

        for file in self.pdin.glob('*.xml'):
            self._grind_xml(file)

    def _grind_xml(self, pfin):
        self.pattern_counter = Counter()
        self.filenames = set()
        self.fhs = {}
        self.corpus = pfin.stem

        with concurrent.futures.ProcessPoolExecutor(max_workers=self.m) as executor:
            jobs = []
            context = etree.iterparse(str(pfin), tag='alpino_ds')

            for _, node in context:
                attrs = node.attrib
                # node has to have id
                if 'id' not in attrs:
                    continue

                jobs.append(executor.submit(self._process_node, etree.tostring(node)))

                # Makes sure our memory usage is kept in check by getting rid of unused elements
                # Borrowed from https://stackoverflow.com/a/7171543/1150683
                node.clear()
                # Also eliminate now-empty references from the root node to elem
                for ancestor in node.xpath('ancestor-or-self::*'):
                    while ancestor.getprevious() is not None:
                        del ancestor.getparent()[0]

            # Get rid of xml iterator
            del context

            sentence_nr = 0
            for job in concurrent.futures.as_completed(jobs):
                sentence_nr += 1
                print(f"Processed {self.corpus} sentence {sentence_nr:,d}", job.result(), flush=True)

            # self.* variables are empty! :-(
            print('pattern counter:', self.pattern_counter)
            print('filenames:', self.filenames)
            print('filehandles:', self.fhs)

            # won't do anything because fh is empty:
            for fh in self.fhs.values():
                fh.close()

    def _process_node(self, xml_str):
        node = etree.fromstring(xml_str)

        all_cats = ''
        for subnode in node.iter('node'):
            children_size = sum(1 for _ in subnode.iterchildren('node'))
            descendants_size = sum(1 for _ in subnode.iter('node'))
            # Size requirements of children and descendants
            if children_size < 1 \
                    or self.max_a < children_size \
                    or descendants_size > self.max_b:
                continue

            # get attribute of node
            cat = subnode.attrib['cat']
            all_cats += cat
            self.pattern_counter[cat] += 1

            # Create new XML tree
            tree_xml = etree.Element('tree', {
                'index': f"{cat}-{self.pattern_counter[cat]}"
            })
            tree_xml.append(copy(subnode))

            # open filehandle and write new tree to file
            if cat not in self.fhs:
                (self.pdout / self.corpus).mkdir(exist_ok=True, parents=True)
                tree_filename = self.pdout / self.corpus / f"{self.corpus}-{cat}-trees.xml"
                # open file handle and keep it open, only close after loop
                self.fhs[cat] = tree_filename.open(mode='a', encoding='utf-8')

            self.fhs[cat].write('\n\t\t' + etree.tostring(tree_xml, encoding='unicode'))

        return all_cats


if __name__ == '__main__':
    # use m=[int] to enable multiple cores or m=0 to utilise all cores
    xml_grindr = XmlGrinder()
    xml_grindr.grind(r'../data', r'../output')
Run Code Online (Sandbox Code Playgroud)

Sample XML (save it to an XML file and put it inside a directory; use that directory as the first argument of xml_grindr.grind()):

<?xml version="1.0" encoding="UTF-8"?><treebank><alpino_ds version="1.3" id="18.head.1.s.1"><node begin="0" cat="top" end="4" id="0" rel="top"><node begin="0" cat="conj" end="4" id="1" rel="--"><node begin="0" end="1" frame="within_word_conjunct" id="2" lcat="np" lemma="_" pos="prefix" postag="SPEC(afgebr)" pt="spec" rel="cnj" root="taal" sense="taal" spectype="afgebr" word="taal-"/><node begin="1" conjtype="neven" end="2" frame="conj(en)" id="3" lcat="vg" lemma="en" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="en"/><node begin="2" cat="mwu" end="4" id="4" mwu_root="spraaktechnologienieuws jul&apos;03" mwu_sense="spraaktechnologienieuws jul&apos;03" rel="cnj"><node begin="2" end="3" frame="proper_name(both)" genus="onz" getal="ev" graad="basis" id="5" lcat="np" lemma="spraaktechnologienieuws" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,onz,stan)" pt="n" rel="mwp" root="spraaktechnologienieuws" sense="spraaktechnologienieuws" word="spraaktechnologienieuws"/><node begin="3" end="4" frame="proper_name(both)" id="6" lcat="np" lemma="_" num="both" pos="name" postag="SPEC(symb)" pt="spec" rel="mwp" root="jul&apos;03" sense="jul&apos;03" spectype="symb" word="jul&apos;03"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="noun(de,count,sg)" gen="de" id="1" lcat="np" lemma="1" num="sg" numtype="hoofd" pos="noun" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="1" sense="1" word="1"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="mwu" end="5" id="1" mwu_root="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" mwu_sense="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Discussie" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="DISCUSSIE" sense="DISCUSSIE" word="DISCUSSIE"/><node begin="1" end="2" frame="proper_name(both)" id="3" lcat="np" lemma="over" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="OVER" sense="OVER" vztype="init" word="OVER"/><node begin="2" end="3" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="4" lcat="np" lemma="taaltechnologie" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALTECHNOLOGIE" sense="TAALTECHNOLOGIE" word="TAALTECHNOLOGIE"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="iN" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="IN" sense="IN" vztype="init" word="IN"/><node begin="4" end="5" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="taalschrift" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALSCHRIFT" sense="TAALSCHRIFT" word="TAALSCHRIFT"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="2" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="2" sense="2" special="hoofd" word="2"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.2"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="mwu" end="7" id="1" mwu_root="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&amp;C" mwu_sense="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&amp;C" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Amerikaans" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="AMERIKAANSE" sense="AMERIKAANSE" word="AMERIKAANSE"/><node begin="1" end="2" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="3" lcat="np" lemma="overheid" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="OVERHEID" sense="OVERHEID" word="OVERHEID"/><node begin="2" end="3" frame="proper_name(both)" id="4" lcat="np" lemma="kiezen" num="both" pos="name" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="mwp" root="KIEST" sense="KIEST" word="KIEST" wvorm="pv"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="voor" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VOOR" sense="VOOR" vztype="init" word="VOOR"/><node begin="4" conjtype="neven" end="5" frame="proper_name(both)" id="6" lcat="np" lemma="linkfactory" num="both" pos="name" postag="VG(neven)" pt="vg" rel="mwp" root="LINKFACTORY" sense="LINKFACTORY" word="LINKFACTORY"/><node begin="5" end="6" frame="proper_name(both)" id="7" lcat="np" lemma="van" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VAN" sense="VAN" vztype="init" word="VAN"/><node begin="6" end="7" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="8" lcat="np" lemma="l&amp;amp;C" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="L&amp;C" sense="L&amp;C" word="L&amp;C"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="3" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="3" sense="3" special="hoofd" word="3"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.2"><node begin="0" cat="top" end="6" id="0" rel="top"><node begin="0" cat="np" end="6" id="1" rel="--"><node begin="0" buiging="met-e" end="1" frame="noun(both,both,both)" gen="both" graad="basis" id="2" lcat="np" lemma="Spraakgestuurde" naamval="stan" num="both" pos="noun" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="hd" root="spraakgestuurde" sense="spraakgestuurde" word="SPRAAKGESTUURDE"/><node begin="1" cat="mwu" end="6" id="3" mwu_root="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" mwu_sense="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" rel="app"><node begin="1" buiging="met-e" end="2" frame="proper_name(both)" graad="basis" id="4" lcat="np" lemma="Last-minuat" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="LAST-MINUTE" sense="LAST-MINUTE" word="LAST-MINUTE"/><node begin="2" end="3" frame="proper_name(both)" getal="mv" graad="basis" id="5" lcat="np" lemma="taalcursus" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TAALCURSUS" sense="TAALCURSUS" word="TAALCURSUS"/><node begin="3" end="4" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="Via" naamval="stan" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,zijd,stan)" pt="n" rel="mwp" root="VIA" sense="VIA" word="VIA"/><node begin="4" end="5" frame="proper_name(both)" id="7" lcat="np" lemma="dE" lwtype="bep" naamval="stan" npagr="rest" num="both" pos="name" postag="LID(bep,stan,rest)" pt="lid" rel="mwp" root="DE" sense="DE" word="DE"/><node begin="5" end="6" frame="proper_name(both)" getal="mv" graad="basis" id="8" lcat="np" lemma="telefoon" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TELEFOON" sense="TELEFOON" word="TELEFOON"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="4" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="4" sense="4" special="hoofd" word="4"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="np" end="5" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" id="2" lcat="np" lemma="_" num="pl" pos="noun" postag="SPEC(deeleigen)" pt="spec" rel="hd" root="aio_vacature" sense="aio_vacature" spectype="deeleigen" word="AIO-VACATURES"/><node begin="1" cat="pp" end="5" id="3" rel="mod"><node begin="1" end="2" frame="preposition(in,[])" id="4" lcat="pp" lemma="iN" pos="prep" postag="VZ(init)" pt="vz" rel="hd" root="in" sense="in" vztype="init" word="IN"/><node begin="2" cat="conj" end="5" id="5" rel="obj1"><node begin="2" end="3" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="6" lcat="np" lemma="Tilburg" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="TILBURG" sense="TILBURG" word="TILBURG"/><node begin="3" conjtype="neven" end="4" frame="conj(en)" id="7" lcat="vg" lemma="eN" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="EN"/><node begin="4" end="5" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="8" lcat="np" lemma="Amsterdam" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="AMSTERDAM" sense="AMSTERDAM" word="AMSTERDAM"/></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.1.s.1"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="smain" end="7" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" getal="mv" graad="basis" id="2" index="1" lcat="np" lemma="computer" ntype="soort" num="pl" pos="noun" postag="N(soort,mv,basis)" pt="n" rel="su" root="computer" sense="computer" word="Computers"/><node begin="1" end="2" frame="verb(hebben,pl,aux(te_inf))" id="3" infl="pl" lcat="smain" lemma="hoeven" pos="verb" postag="WW(pv,tgw,mv)" pt="ww" pvagr="mv" pvtijd="tgw" rel="hd" root="hoef" sc="aux(te_inf)" sense="hoef" tense="present" word="hoeven" wvorm="pv"/><node begin="0" cat="ti" end="7" id="4" rel="vc"><node begin="4" end="5" frame="complementizer(te)" id="5" lcat="cp" lemma="te" pos="comp" postag="VZ(init)" pt="vz" rel="cmp" root="te" sc="te" sense="te" vztype="init" word="te"/><node begin="0" cat="inf" end="7" id="6" rel="body"><node begin="0" end="1" id="7" index="1" rel="su"/><node begin="5" buiging="zonder" end="6" frame="verb('hebben/zijn',inf,aux(inf))" id="8" infl="inf" lcat="inf" lemma="kunnen" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="kan" sc="aux(inf)" sense="kan" word="kunnen" wvorm="inf"/><node begin="0" cat="inf" end="7" id="9" rel="vc"><node begin="0" end="1" id="10" index="1" rel="su"/><node begin="2" cat="np" end="4" id="11" rel="obj1"><node begin="2" buiging="zonder" end="3" frame="determiner(geen,nwh,mod,pro,yparg,nwkpro,geen)" id="12" infl="geen" lcat="detp" lemma="geen" naamval="stan" npagr="agr" pdtype="det" pos="det" positie="prenom" postag="VNW(onbep,det,stan,prenom,zonder,agr)" pt="vnw" rel="det" root="geen" sense="geen" vwtype="onbep" wh="nwh" word="geen"/><node begin="3" end="4" frame="noun(het,mass,sg)" gen="het" genus="onz" getal="ev" graad="basis" id="13" lcat="np" lemma="Nederlands" naamval="stan" ntype="eigen" num="sg" pos="noun" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="hd" root="Nederlands" sense="Nederlands" word="Nederlands"/></node><node begin="6" buiging="zonder" end="7" frame="verb(hebben,inf(no_e),transitive)" id="14" infl="inf(no_e)" lcat="inf" lemma="verstaan" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="versta" sc="transitive" sense="versta" word="verstaan" wvorm="inf"/></node></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.2.s.2"><node begin="0" cat="top" end="14" id="0" rel="top"><node begin="7" end="8" frame="punct(komma)" id="1" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="10" end="11" frame="punct(komma)" id="2" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="0" cat="smain" end="13" id="3" rel="--"><node begin="0" end="1" frame="er_adverb(voor)" id="4" lcat="pp" lemma="daarvoor" pos="pp" postag="BW()" pt="bw" rel="mod" root="daarvoor" sense="daarvoor" special="er" word="Daarvoor"/><node begin="1" end="2" frame="verb(unacc,sg3,intransitive)" id="5" infl="sg3" lcat="smain" lemma="verlopen" pos="verb" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="hd" root="verloop" sc="intransitive" sense="verloop" tense="present" word="verloopt" wvorm="pv"/><node begin="2" cat="np" end="5" id="6" rel="su"><node begin="2" end="3" frame="determiner(de)" id="7" infl="de" lcat="detp" lemma="de" lwtype="bep" naamval="stan" npagr="rest" pos="det" postag="LID(bep,stan,rest)" pt="lid" rel="det" root="de" sense="de" word="de"/><node aform="base" begin="3" buiging="met-e" end="4" frame="adjective(e)" graad="basis" id="8" infl="e" lcat="ap" lemma="menselijk" naamval="stan" pos="adj" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mod" root="menselijk" sense="menselijk" vform="adj" word="menselijke"/><node begin="4" end="5" frame="noun(de,count,sg)" gen="de" genus="zijd" getal="ev" graad="basis" id="9" lcat="np" lemma="communicatie" naamval="stan" ntype="soort" num="sg" pos="noun" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="hd" root="communicatie" sense="communicatie" word="communicatie"/></node><node begin="5" cat="ap" end="7" id="10" rel="mod"><node begin="5" end="6" frame="intensifier" id="11" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="6" buiging="zonder" end="7" frame="adjective(no_e(adv))" graad="basis" id="12" infl="no_e" lcat="ap" lemma="subtiel" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="subtiel" sense="subtiel" vform="adj" word="subtiel"/></node><node begin="8" cat="ppart" end="10" id="13" rel="mod"><node begin="8" end="9" frame="intensifier" id="14" lcat="advp" lemma="te" pos="adv" postag="VZ(init)" pt="vz" rel="mod" root="te" sense="te" special="intensifier" vztype="init" word="te"/><node aform="base" begin="9" buiging="zonder" end="10" frame="adjective(ge_no_e(adv))" id="15" infl="no_e" lcat="ppart" lemma="nuanceren" pos="adj" positie="vrij" postag="WW(vd,vrij,zonder)" pt="ww" rel="hd" root="genuanceerd" sense="genuanceerd" vform="psp" word="genuanceerd" wvorm="vd"/></node><node begin="11" cat="ap" end="13" id="16" rel="mod"><node begin="11" end="12" frame="intensifier" id="17" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="12" buiging="zonder" end="13" frame="adjective(no_e(adv))" graad="basis" id="18" infl="no_e" lcat="ap" lemma="rijk" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="rijk" sense="rijk" vform="adj" word="rijk"/></node></node><node begin="13" end="14" frame="punct(punt)" id="19" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds></treebank>
Run Code Online (Sandbox Code Playgroud)

Pipfile, in case you wish to set up a local environment for testing (the above script probably works with 3.4 and up, though):

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
lxml = ">=4.2.1"

[dev-packages]

[requires]
python_version = "3.6"
Run Code Online (Sandbox Code Playgroud)

Depending on how you named the XML, the output of the script will be something as follows:

Processing WRPEE-dummy sentence 1 topconjmwu
Processing WRPEE-dummy sentence 2 top
Processing WRPEE-dummy sentence 3 topmwu
Processing WRPEE-dummy sentence 4 top
Processing WRPEE-dummy sentence 5 topmwu
Processing WRPEE-dummy sentence 6 top
Processing WRPEE-dummy sentence 7 topnpmwu
Processing WRPEE-dummy sentence 8 top
Processing WRPEE-dummy sentence 9 topnpppconj
Processing WRPEE-dummy sentence 10 topsmaintiinfinfnp
Processing WRPEE-dummy sentence 11 topsmainnpapppartap
pattern counter: Counter()
filenames: set()
filehandles: {}
Run Code Online (Sandbox Code Playgroud)

This shows that the variables that I want to be shared between processes is indeed not shared or modified. (I am aware that this happens because the process and its variables are forked.) In this dummy example it may not seem very important (even though it is clear that now I can't close the opened file handles), but in the large program these and other variables are modified and used inside the forked processes. The question is, then, how to make that work.

I read about Pipe() and Queue() and it seems to me that I would need Queue(). In addition, because I will be reading and writing very often from the different processes to the same object I think I need a Manager() as well. This is the part that I am uncertain about, however. I tried reading this topic but it only confused me more. Furthermore, the comments suggest that even on 3.6.4 there may be issues when using complex objects. Remember that the actual data that I am sharing is nested and consists of different types.

The question, in summary, thus is: how can I re-write the above example code to ensure that all Processes have (non-blocking) read/write access to instance variables inside _process_node and methods that are called from within that process? I am willing to update my current Python version (3.6.4) to 3.7, and to use additional libraries.

Mar*_*ers 6

multiprocessing库允许您在并发Python代码中使用并行性.如果没有multiprocessingPython GIL会妨碍真正的并行执行,但是您应该看到multiprocessing代码与其他并发技术没有区别.基本上,multiprocessing线程和线程之间的最大区别是状态是通过慢速IPC调用共享的.

这意味着您需要仔细处理共享资源.您当前的实施并没有做得很好; 您有多个并发任务访问共享资源,而不考虑其他人可能正在做什么.代码中存在多种竞争条件,其中多个任务可以写入同一文件,或者嵌套数据结构更新而不考虑其他更新.

当您必须更新共享数据结构或文件时,通常可以在两个选项之间进行选择:

  • 使用同步; 任何需要改变资源的人都需要首先获得共享锁,或者使用其他形式的同步原语来协调访问.
  • 使一个任务负责改变资源.这通常涉及一个或多个队列.

请注意,您必须明确地将这些对象(同步原语或队列)中的任何一个传递给子进程,请参阅编程指南 ; 不要在实例上使用引用来共享状态.

对于你的情况,我会选择排队和专门的任务; 您的瓶颈是数据处理,将数据写入磁盘并使用分析任务的结果更新一些数据结构相对较快.

所以使用单个任务写入文件; 只需将序列化的XML字符串与cat值一起放入专用队列,并有一个单独的任务将这些从队列中拉出来并将它们写入文件.然后,这个单独的任务负责所有文件访问,包括打开和关闭.这序列化文件访问并消除了竞争条件和破坏写入的可能性.如果这些文件的数据如此密集和快速以至于使此任务成为瓶颈,则为每个目标文件创建任务.

对共享数据结构执行相同操作; 将突变发送到队列,将其留给专用任务来合并数据.更新代理对象并不合适,因为它们的更改通过RPC调用传播到其他进程,增加了竞争条件的可能性,并且锁定不能保证数据在所有任务进程中保持一致!

对于您的简单示例,Counter()实际上不会共享对象的更新; 每个子进程在分叉和更新本地副本时都会继承副本,并且父进程永远不会看到所做的更改.因此,您将使用本地新Counter()实例,并将其推送到队列中.然后,专用任务可以从队列接收这些内容,并Counter()使用这些值更新本地实例total_counter.update(queued_counter),再次确保序列化更新.

为了说明,这是一个计算Lorem Ipsum数据的人为例子; 一系列count_words任务执行计数,但将Counter()它们生成的对象传递给队列以进行单独的整理任务,以组合成最终的单词计数.单独的日志记录任务将数据从日志记录队列写入磁盘:

import datetime
import random
import re
import time

from collections import Counter
from functools import partial
from multiprocessing import Manager, Pool
from io import TextIOWrapper
from urllib.request import urlopen

COMPLETE = "COMPLETE"

def collating_task(countsqueue, logqueue):
    wordcounts = Counter()

    # Loop until COMPLETE is found in the queue
    for counts in iter(countsqueue.get, COMPLETE):
        wordcounts.update(counts)
        logqueue.put(
            f"collating: updating with {len(counts)} words "
            f"(total {len(wordcounts)})"
        )

    return wordcounts

def logging_task(logqueue):
    # Loop until COMPLETE is found in the queue
    with open('logfile.txt', 'w') as logf:
        for message in iter(logqueue.get, COMPLETE):
            print(datetime.datetime.now(), message, flush=True, file=logf)

def count_words(line, countsqueue, logqueue):
    findwords = re.compile(r"\w+").findall
    counts = Counter(findwords(line))
    logqueue.put(f"counting: counted {len(counts)} words")
    # a random short delay to make this task 'heavy'
    time.sleep(random.uniform(0.0, 0.05))
    countsqueue.put(counts)

def main():
    # Random latin text, 1000 paragraphs
    loripsum_response = urlopen("https://loripsum.net/api/1000/long/plaintext")
    text = list(TextIOWrapper(loripsum_response, encoding="utf8"))
    print(f"Will process {len(text)} lines of data")

    # create managed queues that can be passed in as arguments. The alternative
    # is to create globals or Process() objects with queues passed in.
    manager = Manager()
    countsqueue, logqueue = manager.Queue(), manager.Queue()
    with Pool() as pool:
        # start processing tasks, these loop forever until signalled
        collator = pool.apply_async(collating_task, (countsqueue, logqueue))
        logger = pool.apply_async(logging_task, (logqueue,))

        # process lines, blocks until complete
        pool.map(partial(count_words, countsqueue=countsqueue, logqueue=logqueue), text)

        countsqueue.put(COMPLETE)
        wordcounts = collator.get()
        logqueue.put(COMPLETE)
        logger.wait()

    print(f"Counted {len(wordcounts)} different words; top 5 is:")
    for word, count in wordcounts.most_common(5):
        print(f'{word:<10} {count:4d}')

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

它产生的东西如下:

Will process 2000 lines of data
Counted 5651 different words; top 5 is:
et         2078
in         2074
est        2036
non        1911
ut         1477
Run Code Online (Sandbox Code Playgroud)

并将logfile.txt信息推入记录队列.