如何在python中异步处理xml?

dec*_*rig 6 python xml multithreading sax

我有一个大的XML数据文件(> 160M)来处理,似乎SAX/expat/pulldom解析是要走的路.我希望有一个线程可以筛选节点并将节点推送到队列中,然后其他工作线程将下一个可用节点拉出队列并进行处理.

我有以下(它应该有锁,我知道 - 它会,后来)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

问题是while块的主体只被调用一次,然后我甚至无法ctrl-C中断它.在较小的文件上,输出是预期的,但这似乎表明只有在完全解析文档时才调用处理程序,这似乎违背了SAX解析器的目的.

我确定这是我自己的无知,但我不知道我在哪里弄错了.

PS:我也试过改变start_handler:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()
Run Code Online (Sandbox Code Playgroud)

但是没有爱.

Bri*_*nna 7

我对这个问题不太确定.我猜测对ParseFile的调用是阻塞的,因为GIL只运行解析线程.解决这个问题的方法是使用multiprocessing.无论如何,它旨在与队列一起工作.

你做了一个Process,你可以通过它Queue:

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

我已经包含了一个元素列表,只是为了复制原始脚本.您的最终解决方案可能会使用get_nowaitPool类似的东西.


Ale*_*lli 7

ParseFile正如你所注意到的那样,只需"吞下"所有东西 - 对你想做的增量解析没有好处!因此,只需一次将文件提供给解析器,确保有条件地控制其他线程 - 例如:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)
Run Code Online (Sandbox Code Playgroud)

time.sleep(0.0)呼叫是Python的方式说"提供给其他线程如果有任何准备和等待"; 这个Parse方法记录在这里.

第二点是,忘记锁定这个用法!- 使用Queue.Queue,它本质上是线程安全的,几乎总是在Python中协调多个线程的最佳和最简单的方法.只需在它上面创建一个Queue实例q,q.put(name)然后让线程阻塞q.get()等待进行更多工作 - 这很简单!

(当没有更多的工作要做时,有几种辅助策略可以用来协调工作线程的终止,但最简单的,没有特殊要求的是,只需要让它们成为守护进程线程,这样它们都会在主要时终止线程确实 - 请参阅文档).