Bra*_*roy 7 python parallel-processing shared-memory python-3.x python-multiprocessing
我正在改进一个解析XML并对其子树进行分类和索引的程序.实际程序太大而无法显示在这里,所以我把它归结为一个最小的测试用例,显示我遇到的问题.
这个想法是:
alpino_ds并行处理文件中的所有节点请注意,在实际代码中还有一些注意事项:
dicts of four levels deep consisting of dicts, sets, ints, and strings, as well as dict-to-filehandle, and Counter() objects;ThreadPoolExecutor) and even though there was some gain (I calculated around 5% improvement in speed), this is not good enough for me;alpino_ds tags per file. That is the main reason I want to run things in parallel - there is just so much data. That means that the nested objects get quite big as well, so merging/sharing these objects between processes may be a bottleneck in itself.Example code:
from pathlib import Path
from collections import Counter
from copy import copy
from lxml import etree
import concurrent.futures
class XmlGrinder:
def __init__(self, m=1):
if m is False:
self.m = 1
elif m == 0:
self.m = None
else:
self.m = m
self.max_a = 7
self.max_b = 1000
self.pdin = self.pdout = None
self.pattern_counter = self.fhs = self.corpus = None
def grind(self, din, dout):
self.pdin = Path(din)
self.pdout = Path(dout)
for file in self.pdin.glob('*.xml'):
self._grind_xml(file)
def _grind_xml(self, pfin):
self.pattern_counter = Counter()
self.filenames = set()
self.fhs = {}
self.corpus = pfin.stem
with concurrent.futures.ProcessPoolExecutor(max_workers=self.m) as executor:
jobs = []
context = etree.iterparse(str(pfin), tag='alpino_ds')
for _, node in context:
attrs = node.attrib
# node has to have id
if 'id' not in attrs:
continue
jobs.append(executor.submit(self._process_node, etree.tostring(node)))
# Makes sure our memory usage is kept in check by getting rid of unused elements
# Borrowed from https://stackoverflow.com/a/7171543/1150683
node.clear()
# Also eliminate now-empty references from the root node to elem
for ancestor in node.xpath('ancestor-or-self::*'):
while ancestor.getprevious() is not None:
del ancestor.getparent()[0]
# Get rid of xml iterator
del context
sentence_nr = 0
for job in concurrent.futures.as_completed(jobs):
sentence_nr += 1
print(f"Processed {self.corpus} sentence {sentence_nr:,d}", job.result(), flush=True)
# self.* variables are empty! :-(
print('pattern counter:', self.pattern_counter)
print('filenames:', self.filenames)
print('filehandles:', self.fhs)
# won't do anything because fh is empty:
for fh in self.fhs.values():
fh.close()
def _process_node(self, xml_str):
node = etree.fromstring(xml_str)
all_cats = ''
for subnode in node.iter('node'):
children_size = sum(1 for _ in subnode.iterchildren('node'))
descendants_size = sum(1 for _ in subnode.iter('node'))
# Size requirements of children and descendants
if children_size < 1 \
or self.max_a < children_size \
or descendants_size > self.max_b:
continue
# get attribute of node
cat = subnode.attrib['cat']
all_cats += cat
self.pattern_counter[cat] += 1
# Create new XML tree
tree_xml = etree.Element('tree', {
'index': f"{cat}-{self.pattern_counter[cat]}"
})
tree_xml.append(copy(subnode))
# open filehandle and write new tree to file
if cat not in self.fhs:
(self.pdout / self.corpus).mkdir(exist_ok=True, parents=True)
tree_filename = self.pdout / self.corpus / f"{self.corpus}-{cat}-trees.xml"
# open file handle and keep it open, only close after loop
self.fhs[cat] = tree_filename.open(mode='a', encoding='utf-8')
self.fhs[cat].write('\n\t\t' + etree.tostring(tree_xml, encoding='unicode'))
return all_cats
if __name__ == '__main__':
# use m=[int] to enable multiple cores or m=0 to utilise all cores
xml_grindr = XmlGrinder()
xml_grindr.grind(r'../data', r'../output')
Run Code Online (Sandbox Code Playgroud)
Sample XML (save it to an XML file and put it inside a directory; use that directory as the first argument of xml_grindr.grind()):
<?xml version="1.0" encoding="UTF-8"?><treebank><alpino_ds version="1.3" id="18.head.1.s.1"><node begin="0" cat="top" end="4" id="0" rel="top"><node begin="0" cat="conj" end="4" id="1" rel="--"><node begin="0" end="1" frame="within_word_conjunct" id="2" lcat="np" lemma="_" pos="prefix" postag="SPEC(afgebr)" pt="spec" rel="cnj" root="taal" sense="taal" spectype="afgebr" word="taal-"/><node begin="1" conjtype="neven" end="2" frame="conj(en)" id="3" lcat="vg" lemma="en" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="en"/><node begin="2" cat="mwu" end="4" id="4" mwu_root="spraaktechnologienieuws jul'03" mwu_sense="spraaktechnologienieuws jul'03" rel="cnj"><node begin="2" end="3" frame="proper_name(both)" genus="onz" getal="ev" graad="basis" id="5" lcat="np" lemma="spraaktechnologienieuws" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,onz,stan)" pt="n" rel="mwp" root="spraaktechnologienieuws" sense="spraaktechnologienieuws" word="spraaktechnologienieuws"/><node begin="3" end="4" frame="proper_name(both)" id="6" lcat="np" lemma="_" num="both" pos="name" postag="SPEC(symb)" pt="spec" rel="mwp" root="jul'03" sense="jul'03" spectype="symb" word="jul'03"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="noun(de,count,sg)" gen="de" id="1" lcat="np" lemma="1" num="sg" numtype="hoofd" pos="noun" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="1" sense="1" word="1"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="mwu" end="5" id="1" mwu_root="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" mwu_sense="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Discussie" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="DISCUSSIE" sense="DISCUSSIE" word="DISCUSSIE"/><node begin="1" end="2" frame="proper_name(both)" id="3" lcat="np" lemma="over" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="OVER" sense="OVER" vztype="init" word="OVER"/><node begin="2" end="3" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="4" lcat="np" lemma="taaltechnologie" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALTECHNOLOGIE" sense="TAALTECHNOLOGIE" word="TAALTECHNOLOGIE"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="iN" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="IN" sense="IN" vztype="init" word="IN"/><node begin="4" end="5" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="taalschrift" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALSCHRIFT" sense="TAALSCHRIFT" word="TAALSCHRIFT"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="2" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="2" sense="2" special="hoofd" word="2"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.2"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="mwu" end="7" id="1" mwu_root="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&C" mwu_sense="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&C" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Amerikaans" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="AMERIKAANSE" sense="AMERIKAANSE" word="AMERIKAANSE"/><node begin="1" end="2" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="3" lcat="np" lemma="overheid" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="OVERHEID" sense="OVERHEID" word="OVERHEID"/><node begin="2" end="3" frame="proper_name(both)" id="4" lcat="np" lemma="kiezen" num="both" pos="name" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="mwp" root="KIEST" sense="KIEST" word="KIEST" wvorm="pv"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="voor" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VOOR" sense="VOOR" vztype="init" word="VOOR"/><node begin="4" conjtype="neven" end="5" frame="proper_name(both)" id="6" lcat="np" lemma="linkfactory" num="both" pos="name" postag="VG(neven)" pt="vg" rel="mwp" root="LINKFACTORY" sense="LINKFACTORY" word="LINKFACTORY"/><node begin="5" end="6" frame="proper_name(both)" id="7" lcat="np" lemma="van" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VAN" sense="VAN" vztype="init" word="VAN"/><node begin="6" end="7" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="8" lcat="np" lemma="l&amp;C" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="L&C" sense="L&C" word="L&C"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="3" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="3" sense="3" special="hoofd" word="3"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.2"><node begin="0" cat="top" end="6" id="0" rel="top"><node begin="0" cat="np" end="6" id="1" rel="--"><node begin="0" buiging="met-e" end="1" frame="noun(both,both,both)" gen="both" graad="basis" id="2" lcat="np" lemma="Spraakgestuurde" naamval="stan" num="both" pos="noun" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="hd" root="spraakgestuurde" sense="spraakgestuurde" word="SPRAAKGESTUURDE"/><node begin="1" cat="mwu" end="6" id="3" mwu_root="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" mwu_sense="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" rel="app"><node begin="1" buiging="met-e" end="2" frame="proper_name(both)" graad="basis" id="4" lcat="np" lemma="Last-minuat" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="LAST-MINUTE" sense="LAST-MINUTE" word="LAST-MINUTE"/><node begin="2" end="3" frame="proper_name(both)" getal="mv" graad="basis" id="5" lcat="np" lemma="taalcursus" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TAALCURSUS" sense="TAALCURSUS" word="TAALCURSUS"/><node begin="3" end="4" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="Via" naamval="stan" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,zijd,stan)" pt="n" rel="mwp" root="VIA" sense="VIA" word="VIA"/><node begin="4" end="5" frame="proper_name(both)" id="7" lcat="np" lemma="dE" lwtype="bep" naamval="stan" npagr="rest" num="both" pos="name" postag="LID(bep,stan,rest)" pt="lid" rel="mwp" root="DE" sense="DE" word="DE"/><node begin="5" end="6" frame="proper_name(both)" getal="mv" graad="basis" id="8" lcat="np" lemma="telefoon" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TELEFOON" sense="TELEFOON" word="TELEFOON"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="4" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="4" sense="4" special="hoofd" word="4"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="np" end="5" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" id="2" lcat="np" lemma="_" num="pl" pos="noun" postag="SPEC(deeleigen)" pt="spec" rel="hd" root="aio_vacature" sense="aio_vacature" spectype="deeleigen" word="AIO-VACATURES"/><node begin="1" cat="pp" end="5" id="3" rel="mod"><node begin="1" end="2" frame="preposition(in,[])" id="4" lcat="pp" lemma="iN" pos="prep" postag="VZ(init)" pt="vz" rel="hd" root="in" sense="in" vztype="init" word="IN"/><node begin="2" cat="conj" end="5" id="5" rel="obj1"><node begin="2" end="3" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="6" lcat="np" lemma="Tilburg" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="TILBURG" sense="TILBURG" word="TILBURG"/><node begin="3" conjtype="neven" end="4" frame="conj(en)" id="7" lcat="vg" lemma="eN" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="EN"/><node begin="4" end="5" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="8" lcat="np" lemma="Amsterdam" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="AMSTERDAM" sense="AMSTERDAM" word="AMSTERDAM"/></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.1.s.1"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="smain" end="7" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" getal="mv" graad="basis" id="2" index="1" lcat="np" lemma="computer" ntype="soort" num="pl" pos="noun" postag="N(soort,mv,basis)" pt="n" rel="su" root="computer" sense="computer" word="Computers"/><node begin="1" end="2" frame="verb(hebben,pl,aux(te_inf))" id="3" infl="pl" lcat="smain" lemma="hoeven" pos="verb" postag="WW(pv,tgw,mv)" pt="ww" pvagr="mv" pvtijd="tgw" rel="hd" root="hoef" sc="aux(te_inf)" sense="hoef" tense="present" word="hoeven" wvorm="pv"/><node begin="0" cat="ti" end="7" id="4" rel="vc"><node begin="4" end="5" frame="complementizer(te)" id="5" lcat="cp" lemma="te" pos="comp" postag="VZ(init)" pt="vz" rel="cmp" root="te" sc="te" sense="te" vztype="init" word="te"/><node begin="0" cat="inf" end="7" id="6" rel="body"><node begin="0" end="1" id="7" index="1" rel="su"/><node begin="5" buiging="zonder" end="6" frame="verb('hebben/zijn',inf,aux(inf))" id="8" infl="inf" lcat="inf" lemma="kunnen" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="kan" sc="aux(inf)" sense="kan" word="kunnen" wvorm="inf"/><node begin="0" cat="inf" end="7" id="9" rel="vc"><node begin="0" end="1" id="10" index="1" rel="su"/><node begin="2" cat="np" end="4" id="11" rel="obj1"><node begin="2" buiging="zonder" end="3" frame="determiner(geen,nwh,mod,pro,yparg,nwkpro,geen)" id="12" infl="geen" lcat="detp" lemma="geen" naamval="stan" npagr="agr" pdtype="det" pos="det" positie="prenom" postag="VNW(onbep,det,stan,prenom,zonder,agr)" pt="vnw" rel="det" root="geen" sense="geen" vwtype="onbep" wh="nwh" word="geen"/><node begin="3" end="4" frame="noun(het,mass,sg)" gen="het" genus="onz" getal="ev" graad="basis" id="13" lcat="np" lemma="Nederlands" naamval="stan" ntype="eigen" num="sg" pos="noun" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="hd" root="Nederlands" sense="Nederlands" word="Nederlands"/></node><node begin="6" buiging="zonder" end="7" frame="verb(hebben,inf(no_e),transitive)" id="14" infl="inf(no_e)" lcat="inf" lemma="verstaan" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="versta" sc="transitive" sense="versta" word="verstaan" wvorm="inf"/></node></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.2.s.2"><node begin="0" cat="top" end="14" id="0" rel="top"><node begin="7" end="8" frame="punct(komma)" id="1" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="10" end="11" frame="punct(komma)" id="2" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="0" cat="smain" end="13" id="3" rel="--"><node begin="0" end="1" frame="er_adverb(voor)" id="4" lcat="pp" lemma="daarvoor" pos="pp" postag="BW()" pt="bw" rel="mod" root="daarvoor" sense="daarvoor" special="er" word="Daarvoor"/><node begin="1" end="2" frame="verb(unacc,sg3,intransitive)" id="5" infl="sg3" lcat="smain" lemma="verlopen" pos="verb" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="hd" root="verloop" sc="intransitive" sense="verloop" tense="present" word="verloopt" wvorm="pv"/><node begin="2" cat="np" end="5" id="6" rel="su"><node begin="2" end="3" frame="determiner(de)" id="7" infl="de" lcat="detp" lemma="de" lwtype="bep" naamval="stan" npagr="rest" pos="det" postag="LID(bep,stan,rest)" pt="lid" rel="det" root="de" sense="de" word="de"/><node aform="base" begin="3" buiging="met-e" end="4" frame="adjective(e)" graad="basis" id="8" infl="e" lcat="ap" lemma="menselijk" naamval="stan" pos="adj" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mod" root="menselijk" sense="menselijk" vform="adj" word="menselijke"/><node begin="4" end="5" frame="noun(de,count,sg)" gen="de" genus="zijd" getal="ev" graad="basis" id="9" lcat="np" lemma="communicatie" naamval="stan" ntype="soort" num="sg" pos="noun" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="hd" root="communicatie" sense="communicatie" word="communicatie"/></node><node begin="5" cat="ap" end="7" id="10" rel="mod"><node begin="5" end="6" frame="intensifier" id="11" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="6" buiging="zonder" end="7" frame="adjective(no_e(adv))" graad="basis" id="12" infl="no_e" lcat="ap" lemma="subtiel" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="subtiel" sense="subtiel" vform="adj" word="subtiel"/></node><node begin="8" cat="ppart" end="10" id="13" rel="mod"><node begin="8" end="9" frame="intensifier" id="14" lcat="advp" lemma="te" pos="adv" postag="VZ(init)" pt="vz" rel="mod" root="te" sense="te" special="intensifier" vztype="init" word="te"/><node aform="base" begin="9" buiging="zonder" end="10" frame="adjective(ge_no_e(adv))" id="15" infl="no_e" lcat="ppart" lemma="nuanceren" pos="adj" positie="vrij" postag="WW(vd,vrij,zonder)" pt="ww" rel="hd" root="genuanceerd" sense="genuanceerd" vform="psp" word="genuanceerd" wvorm="vd"/></node><node begin="11" cat="ap" end="13" id="16" rel="mod"><node begin="11" end="12" frame="intensifier" id="17" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="12" buiging="zonder" end="13" frame="adjective(no_e(adv))" graad="basis" id="18" infl="no_e" lcat="ap" lemma="rijk" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="rijk" sense="rijk" vform="adj" word="rijk"/></node></node><node begin="13" end="14" frame="punct(punt)" id="19" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds></treebank>
Run Code Online (Sandbox Code Playgroud)
Pipfile, in case you wish to set up a local environment for testing (the above script probably works with 3.4 and up, though):
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
lxml = ">=4.2.1"
[dev-packages]
[requires]
python_version = "3.6"
Run Code Online (Sandbox Code Playgroud)
Depending on how you named the XML, the output of the script will be something as follows:
Processing WRPEE-dummy sentence 1 topconjmwu
Processing WRPEE-dummy sentence 2 top
Processing WRPEE-dummy sentence 3 topmwu
Processing WRPEE-dummy sentence 4 top
Processing WRPEE-dummy sentence 5 topmwu
Processing WRPEE-dummy sentence 6 top
Processing WRPEE-dummy sentence 7 topnpmwu
Processing WRPEE-dummy sentence 8 top
Processing WRPEE-dummy sentence 9 topnpppconj
Processing WRPEE-dummy sentence 10 topsmaintiinfinfnp
Processing WRPEE-dummy sentence 11 topsmainnpapppartap
pattern counter: Counter()
filenames: set()
filehandles: {}
Run Code Online (Sandbox Code Playgroud)
This shows that the variables that I want to be shared between processes is indeed not shared or modified. (I am aware that this happens because the process and its variables are forked.) In this dummy example it may not seem very important (even though it is clear that now I can't close the opened file handles), but in the large program these and other variables are modified and used inside the forked processes. The question is, then, how to make that work.
I read about Pipe() and Queue() and it seems to me that I would need Queue(). In addition, because I will be reading and writing very often from the different processes to the same object I think I need a Manager() as well. This is the part that I am uncertain about, however. I tried reading this topic but it only confused me more. Furthermore, the comments suggest that even on 3.6.4 there may be issues when using complex objects. Remember that the actual data that I am sharing is nested and consists of different types.
The question, in summary, thus is: how can I re-write the above example code to ensure that all Processes have (non-blocking) read/write access to instance variables inside _process_node and methods that are called from within that process? I am willing to update my current Python version (3.6.4) to 3.7, and to use additional libraries.
该multiprocessing库允许您在并发Python代码中使用并行性.如果没有multiprocessingPython GIL会妨碍真正的并行执行,但是您应该看到multiprocessing代码与其他并发技术没有区别.基本上,multiprocessing线程和线程之间的最大区别是状态是通过慢速IPC调用共享的.
这意味着您需要仔细处理共享资源.您当前的实施并没有做得很好; 您有多个并发任务访问共享资源,而不考虑其他人可能正在做什么.代码中存在多种竞争条件,其中多个任务可以写入同一文件,或者嵌套数据结构更新而不考虑其他更新.
当您必须更新共享数据结构或文件时,通常可以在两个选项之间进行选择:
请注意,您必须明确地将这些对象(同步原语或队列)中的任何一个传递给子进程,请参阅编程指南 ; 不要在实例上使用引用来共享状态.
对于你的情况,我会选择排队和专门的任务; 您的瓶颈是数据处理,将数据写入磁盘并使用分析任务的结果更新一些数据结构相对较快.
所以使用单个任务写入文件; 只需将序列化的XML字符串与cat值一起放入专用队列,并有一个单独的任务将这些从队列中拉出来并将它们写入文件.然后,这个单独的任务负责所有文件访问,包括打开和关闭.这序列化文件访问并消除了竞争条件和破坏写入的可能性.如果这些文件的数据如此密集和快速以至于使此任务成为瓶颈,则为每个目标文件创建任务.
对共享数据结构执行相同操作; 将突变发送到队列,将其留给专用任务来合并数据.更新代理对象并不合适,因为它们的更改通过RPC调用传播到其他进程,增加了竞争条件的可能性,并且锁定不能保证数据在所有任务进程中保持一致!
对于您的简单示例,Counter()实际上不会共享对象的更新; 每个子进程在分叉和更新本地副本时都会继承副本,并且父进程永远不会看到所做的更改.因此,您将使用本地新Counter()实例,并将其推送到队列中.然后,专用任务可以从队列接收这些内容,并Counter()使用这些值更新本地实例total_counter.update(queued_counter),再次确保序列化更新.
为了说明,这是一个计算Lorem Ipsum数据的人为例子; 一系列count_words任务执行计数,但将Counter()它们生成的对象传递给队列以进行单独的整理任务,以组合成最终的单词计数.单独的日志记录任务将数据从日志记录队列写入磁盘:
import datetime
import random
import re
import time
from collections import Counter
from functools import partial
from multiprocessing import Manager, Pool
from io import TextIOWrapper
from urllib.request import urlopen
COMPLETE = "COMPLETE"
def collating_task(countsqueue, logqueue):
wordcounts = Counter()
# Loop until COMPLETE is found in the queue
for counts in iter(countsqueue.get, COMPLETE):
wordcounts.update(counts)
logqueue.put(
f"collating: updating with {len(counts)} words "
f"(total {len(wordcounts)})"
)
return wordcounts
def logging_task(logqueue):
# Loop until COMPLETE is found in the queue
with open('logfile.txt', 'w') as logf:
for message in iter(logqueue.get, COMPLETE):
print(datetime.datetime.now(), message, flush=True, file=logf)
def count_words(line, countsqueue, logqueue):
findwords = re.compile(r"\w+").findall
counts = Counter(findwords(line))
logqueue.put(f"counting: counted {len(counts)} words")
# a random short delay to make this task 'heavy'
time.sleep(random.uniform(0.0, 0.05))
countsqueue.put(counts)
def main():
# Random latin text, 1000 paragraphs
loripsum_response = urlopen("https://loripsum.net/api/1000/long/plaintext")
text = list(TextIOWrapper(loripsum_response, encoding="utf8"))
print(f"Will process {len(text)} lines of data")
# create managed queues that can be passed in as arguments. The alternative
# is to create globals or Process() objects with queues passed in.
manager = Manager()
countsqueue, logqueue = manager.Queue(), manager.Queue()
with Pool() as pool:
# start processing tasks, these loop forever until signalled
collator = pool.apply_async(collating_task, (countsqueue, logqueue))
logger = pool.apply_async(logging_task, (logqueue,))
# process lines, blocks until complete
pool.map(partial(count_words, countsqueue=countsqueue, logqueue=logqueue), text)
countsqueue.put(COMPLETE)
wordcounts = collator.get()
logqueue.put(COMPLETE)
logger.wait()
print(f"Counted {len(wordcounts)} different words; top 5 is:")
for word, count in wordcounts.most_common(5):
print(f'{word:<10} {count:4d}')
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
它产生的东西如下:
Will process 2000 lines of data
Counted 5651 different words; top 5 is:
et 2078
in 2074
est 2036
non 1911
ut 1477
Run Code Online (Sandbox Code Playgroud)
并将logfile.txt信息推入记录队列.
| 归档时间: |
|
| 查看次数: |
189 次 |
| 最近记录: |