int*_*ted 9 python queue multithreading
我正在学习使用Queue模块,并且对于如何使队列使用者线程知道队列是完整的而感到有点困惑.理想情况下,我想get()在消费者线程中使用它,并在队列被标记为"完成"时抛出异常.有没有比通过附加标记值来标记队列中的最后一项更好的方式进行通信?
基于Glenn Maynard和其他人的一些建议(谢谢!),我决定推出一个实现方法的后代.它以原始(未打包)模块的形式提供.当我有更多的时间时,我会把它清理干净并正确打包.目前,该模块仅包含类和异常类.我打算将它扩展为包含和的子类.Queue.QueuecloseCloseableQueueClosedQueue.LifoQueueQueue.PriorityQueue
它目前处于一个非常初步的状态,也就是说虽然它通过了它的测试套件,但实际上我还没有用它.你的旅费可能会改变.我将通过令人兴奋的消息更新这个答案.
该CloseableQueue班的略有不同,从格伦在关闭队列将防止未来的建议,putS,但并不妨碍未来gets,至队列被清空.这对我来说最有意义; 似乎清除队列的功能可以作为单独的mixin*添加,该功能与可关性功能正交.所以基本上CloseableQueue,通过关闭队列,你可以指出最后一个元素put.还有一个选项可以通过传递last=True到最终put调用来原子地执行此操作.后续调用put,和后续调用get一旦队列被清空,以及优秀的阻塞的呼叫匹配这些描述,将提高Closed异常.
这对于单个生产者为一个或多个消费者生成数据的情况非常有用,但是对于消费者正在等待特定项目或项目集的多重布置也是有用的.特别是它没有提供确定所有生产者都已完成生产的方法.实现这一目标需要提供一些注册生产者的方法(.open()?),以及表明生产者注册本身已经关闭的方法.
建议和/或代码审查是非常受欢迎的.我没有编写大量的并发代码,但希望测试套件足够彻底,以至于代码传递它的事实表明了代码的质量,而不是套件的缺乏.我能够重用Queue模块测试套件中的一堆代码:文件本身包含在此模块中,并用作各种子类和例程的基础,包括回归测试.这可能(希望)有助于避免测试部门完全无能为力.代码本身只是覆盖Queue.get并且Queue.put具有相当小的更改,并添加了close和closed方法.
我有意避免在代码本身和测试套件中使用像上下文管理器这样的任何新兴功能,以努力使代码保持向后兼容,就像Queue模块本身一样,这确实是倒退的.我可能会在某些时候添加__enter__和__exit__方法; 否则,contextlib的关闭函数应该适用于CloseableQueue实例.
*:这里我松散地使用术语"mixin".由于Queue模块的类是旧式的,因此需要使用类工厂函数混合mixins; 一些限制适用; 在Guido禁止的地方提供空置.
该CloseableQueue模块现在提供CloseableLifoQueue和CloseablePriorityQueue课程.我还添加了一些便利函数来支持迭代.还需要将其作为一个合适的包装进行修改.有一个类工厂函数,可以方便地对其他Queue.Queue派生类进行子类化.
CloseableQueue现在可以通过PyPI获得,例如
$ easy_install CloseableQueue
Run Code Online (Sandbox Code Playgroud)
欢迎提出评论和批评,特别是来自这个答案的匿名downvoter.
队列本身并不具备完成或完成的想法.它们可以无限期使用.要在完成后将其关闭,您确实需要在结尾处放置None或其他一些魔术值并编写逻辑来检查它,如您所述.理想的方法可能是对Queue对象进行子类化.
请参阅http://en.wikipedia.org/wiki/Queue_(data_structure)以了解有关队列的更多信息.
哨兵是一种关闭队列的自然方式,但有几件事需要注意.
首先,请记住您可能有多个消费者,因此您需要为每个正在运行的消费者发送一次哨兵,并保证每个消费者只消耗一个哨兵,以确保每个消费者收到其关闭的哨兵.
其次,请记住Queue定义了一个接口,并且在可能的情况下,代码应该与底层Queue无关.您可能有一个PriorityQueue,或者您可能有一些其他类暴露相同的接口并以其他顺序返回值.
不幸的是,很难处理这两个问题.为了处理不同队列的一般情况,正在关闭的消费者必须在收到其关闭的sentinel之后继续使用值,直到队列为空.这意味着它可能消耗另一个线程的哨兵.这是Queue接口的一个弱点:它应该有一个Queue.shutdown调用,导致所有消费者抛出异常,但这是缺失的.
所以,在实践中:
| 归档时间: |
|
| 查看次数: |
7057 次 |
| 最近记录: |