在队列上复用.Queue?

Mat*_*ner 12 python queue go multiplexing python-3.x

如何同时"选择"多个queue.Queue

Golang的频道具有所需的功能:

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}
Run Code Online (Sandbox Code Playgroud)

其中,要解锁的第一个通道执行相应的块.我将如何在Python中实现这一目标?

Update0

根据tux21b的答案中给出的链接,所需的队列类型具有以下属性:

  • 多生产者/多消费者队列(MPMC)
  • 提供每个生产者的FIFO/LIFO
  • 当队列为空/完全消费者/生产者被阻止时

此外,渠道可以阻塞,生产者将阻止,直到消费者检索该项目.我不确定Python的Queue可以做到这一点.

tux*_*21b 2

生产者-消费者队列有许多不同的实现,例如可用的queue.Queue。它们通常在许多属性上有所不同,如德米特里·维尤科夫 (Dmitry Vyukov) 的这篇优秀文章中列出的那样。正如您所看到的,有超过 10k 种不同的可能组合。根据需求的不同,用于此类队列的算法也有很大不同。不可能仅仅扩展现有的队列算法来保证附加属性,因为这通常需要不同的内部数据结构和不同的算法。

Go 的通道提供了相对较多的保证属性,因此这些通道可能适合很多程序。最难的要求之一是支持同时读取/阻塞多个通道(select 语句),并且如果 select 语句中的多个分支能够继续,则公平地选择一个通道,这样就不会留下任何消息。Python 的queue.Queue不提供此功能,因此根本不可能用它来归档相同的行为。

因此,如果您想继续使用queue.Queue,您需要找到解决该问题的方法。然而,这些解决方法也有其自身的缺点,并且更难以维护。寻找另一个提供您需要的功能的生产者-消费者队列可能是一个更好的主意!无论如何,这里有两种可能的解决方法:

轮询

while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)
Run Code Online (Sandbox Code Playgroud)

这可能会在轮询通道时使用大量 CPU 周期,并且当存在大量消息时可能会很慢。使用具有指数退避时间的 time.sleep() (而不是此处显示的恒定 0.1 秒)可能会极大地改进此版本。

单个通知队列

queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2
Run Code Online (Sandbox Code Playgroud)

通过此设置,您必须在发送到 c1 或 c2 后将某些内容发送到通知队列。这可能对您有用,只要只有一个这样的通知队列就足够了(即您没有多个“选择”,每个选择都阻塞在通道的不同子集上)。

或者,您也可以考虑使用 Go。无论如何,Go 的 goroutine 和并发支持比 Python 有限的线程功能强大得多。