Python 中的线程队列挂起

wis*_*shi 4 python queue multithreading freeze

我正在尝试通过队列使解析器成为多线程。它似乎有效,但我的队列挂起。如果有人能告诉我如何解决这个问题,我将不胜感激,因为我很少编写多线程代码。

此代码从 Q 中读取:

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread

l = []
q = Queue.Queue()

def parse_record():
    d = {}
    while not q.empty():
        rec = q.get()
        d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
        # ... many ops like this
        d['dport'] = rec.dport
        l.append(d) # l is global
Run Code Online (Sandbox Code Playgroud)

这填补了问题:

def parse_records():
    ffile = '/tmp/query.rwf'
    flows = SilkFile(ffile, READ)
    numthreads = 2

    # fill queue
    for rec in flows:
        q.put(rec)
    # work on Queue    
    for i in range(numthreads):
        t = Thread(target = parse_record)
        t.daemon = True
        t.start()

    # blocking
    q.join()

    # never reached    
    data_df = pandas.DataFrame.from_records(l)
    return data_df
Run Code Online (Sandbox Code Playgroud)

我只调用parse_records()我的 main. 它永远不会终止。

tde*_*ney 5

Queue.empty 文档说:

\n\n
\n

...如果empty()返回False,它不能保证后续对get()的调用不会阻塞。

\n
\n\n

至少您应该使用它get_nowait,否则可能会导致数据丢失。但更重要的是,只有当所有排队项目都通过Queue.task_done调用标记为完成时,连接才会释放 :

\n\n
\n

如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(这意味着对于已 put() 到队列中的每个项目都会收到一个 task_done() 调用)。

\n
\n\n

附带说明一下,l.append(d)它不是原子的,应该用锁来保护。

\n\n
from silk import *\nimport json\nimport datetime\nimport pandas\nimport Queue\nfrom threading import Thread, Lock\n\nl = []\nl_lock = Lock()\nq = Queue.Queue()\n\ndef parse_record():\n    d = {}\n    while 1:\n        try:\n            rec = q.getnowait()\n            d[\'timestamp\'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")\n            # ... many ops like this\n            d[\'dport\'] = rec.dport\n            with l_lock():\n                l.append(d) # l is global\n            q.task_done()\n        except Queue.Empty:\n            return\n
Run Code Online (Sandbox Code Playgroud)\n\n

通过使用标准库中的线程池,您可以大大缩短代码。

\n\n
from silk import *\nimport json\nimport datetime\nimport pandas\nimport multiprocessing.pool\n\ndef parse_record(rec):\n    d = {}\n    d[\'timestamp\'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")\n    # ... many ops like this\n    d[\'dport\'] = rec.dport\n    return d\n\ndef parse_records():\n    ffile = \'/tmp/query.rwf\'\n    flows = SilkFile(ffile, READ)\n    pool = multiprocessing.pool.Pool(2)\n    data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows)\n    pool.close()\n    return data_df\n
Run Code Online (Sandbox Code Playgroud)\n