Python相当于Golang在频道上的选择

Cen*_*lti 19 python go

Go有一个适用于频道的select语句.从文档:

select语句允许goroutine等待多个通信操作.

一个选择块直到其中一个案例可以运行,然后执行该案例.如果多个准备就绪,它会随机选择一个.

是否有Python等效的以下代码:

package main

import "fmt"

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            c1 <- i
        }
        quit <- 0
    }()

    go func() {
        for i := 0; i < 2; i++ {
            c2 <- i
        }
    }()

    for {
        select {
        case <-c1:
            fmt.Println("Received value from c1")
        case <-c2:
            fmt.Println("Received value from c2")
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

该计划的输出:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit
Run Code Online (Sandbox Code Playgroud)

Tho*_*mas 12

这是一个非常直接的翻译,但"选择哪个如果多个已经准备好"部分的工作方式不同 - 它只是采取了首先出现的内容.这也就像运行代码一样gomaxprocs(1).

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()
Run Code Online (Sandbox Code Playgroud)

基本的变化是使用组合消息的线程模拟选择.如果您打算多使用此模式,可以编写一些选择代码:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()
Run Code Online (Sandbox Code Playgroud)

但...

请注意,这个选择并不是很好的选择,尽管它对你的程序没有关系 - 如果我们并不总是遍历,那么goroutine可以在一个将在select中排队的频道发送结果并丢失.选择完成!


Bri*_*sey 11

还可以考虑Benoit Chesneau 的偏移库.它是Python的Go并发模型的一个端口,使用了光纤.

他在PyCon APAC 2013上发表了关于此事的演讲:


Wil*_*ley 7

您可以使用multiprocessing.Pipe来代替chan,threading.Thread代替goselect.select代替的select.

这是使用这种方法在Python中重新实现你的go示例:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread


def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print 'Received value from c1'
        elif which == c2_r:
            c2_r.recv()
            print 'Received value from c2'
        elif which == quit_r:
            quit_r.recv()
            print 'Received value from quit'
            return

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

这个实现基于@Thomas的实现,但与@ Thomas不同,它不会产生额外的线程来执行select.

使用Python 2.7.13在Linux上测试.Windows可能表现不同,因为select是Unixy的东西.