dec*_*rig 6 python xml multithreading sax
我有一个大的XML数据文件(> 160M)来处理,似乎SAX/expat/pulldom解析是要走的路.我希望有一个线程可以筛选节点并将节点推送到队列中,然后其他工作线程将下一个可用节点拉出队列并进行处理.
我有以下(它应该有锁,我知道 - 它会,后来)
import sys, time
import xml.parsers.expat
import threading
q = []
def start_handler(name, attrs):
q.append(name)
def do_expat():
p = xml.parsers.expat.ParserCreate()
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
t = threading.Thread(group=None, target=do_expat)
t.start()
while True:
print(q)
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
问题是while
块的主体只被调用一次,然后我甚至无法ctrl-C中断它.在较小的文件上,输出是预期的,但这似乎表明只有在完全解析文档时才调用处理程序,这似乎违背了SAX解析器的目的.
我确定这是我自己的无知,但我不知道我在哪里弄错了.
PS:我也试过改变start_handler
:
def start_handler(name, attrs):
def app():
q.append(name)
u = threading.Thread(group=None, target=app)
u.start()
Run Code Online (Sandbox Code Playgroud)
但是没有爱.
我对这个问题不太确定.我猜测对ParseFile的调用是阻塞的,因为GIL只运行解析线程.解决这个问题的方法是使用multiprocessing
.无论如何,它旨在与队列一起工作.
import sys, time
import xml.parsers.expat
import multiprocessing
import Queue
def do_expat(q):
p = xml.parsers.expat.ParserCreate()
def start_handler(name, attrs):
q.put(name)
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
if __name__ == '__main__':
q = multiprocessing.Queue()
process = multiprocessing.Process(target=do_expat, args=(q,))
process.start()
elements = []
while True:
while True:
try:
elements.append(q.get_nowait())
except Queue.Empty:
break
print elements
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
我已经包含了一个元素列表,只是为了复制原始脚本.您的最终解决方案可能会使用get_nowait
和Pool
类似的东西.
ParseFile
正如你所注意到的那样,只需"吞下"所有东西 - 对你想做的增量解析没有好处!因此,只需一次将文件提供给解析器,确保有条件地控制其他线程 - 例如:
while True:
data = f.read(BUFSIZE)
if not data:
p.Parse('', True)
break
p.Parse(data, False)
time.sleep(0.0)
Run Code Online (Sandbox Code Playgroud)
该time.sleep(0.0)
呼叫是Python的方式说"提供给其他线程如果有任何准备和等待"; 这个Parse
方法记录在这里.
第二点是,忘记锁定这个用法!- 使用Queue.Queue,它本质上是线程安全的,几乎总是在Python中协调多个线程的最佳和最简单的方法.只需在它上面创建一个Queue
实例q
,q.put(name)
然后让线程阻塞q.get()
等待进行更多工作 - 这很简单!
(当没有更多的工作要做时,有几种辅助策略可以用来协调工作线程的终止,但最简单的,没有特殊要求的是,只需要让它们成为守护进程线程,这样它们都会在主要时终止线程确实 - 请参阅文档).