我想编写一个执行以下操作的python脚本(称之为父级):
(1)定义了一个多维numpy数组
(2) forks 10个不同的python脚本(称为孩子).它们中的每一个都必须能够在任何单个时间点(只要它们存活)从(1)read中获得numpy数组的内容.
(3)每个子脚本都会自己完成工作(孩子们不要互相分享任何信息)
(4)在任何时间点,父脚本必须能够接受来自其所有子节点的消息.这些消息将由父进行解析,并使(1)中的numpy数组发生更改.
python在Linux环境中工作时,我该如何解决这个问题?我想过使用zeroMQ并让父母成为一个订阅者,而孩子们都是出版商 ; 它有意义还是有更好的方法呢?
另外,如何允许所有子节点连续读取父节点numpy定义的数组内容?
Dan*_*ocq 14
sub通道不必是要绑定的通道,因此您可以让订户绑定,并且每个子pub通道都可以连接到该通道并发送它们的消息.在这种特殊情况下,我认为该multiprocessing模块更适合,但我认为它提到:
import zmq
import threading
# So that you can copy-and-paste this into an interactive session, I'm
# using threading, but obviously that's not what you'd use
# I'm the subscriber that multiple clients are writing to
def parent():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, 'Child:')
# Even though I'm the subscriber, I'm allowed to get this party
# started with `bind`
socket.bind('tcp://127.0.0.1:5000')
# I expect 50 messages
for i in range(50):
print 'Parent received: %s' % socket.recv()
# I'm a child publisher
def child(number):
context = zmq.Context()
socket = context.socket(zmq.PUB)
# And even though I'm the publisher, I can do the connecting rather
# than the binding
socket.connect('tcp://127.0.0.1:5000')
for data in range(5):
socket.send('Child: %i %i' % (number, data))
socket.close()
threads = [threading.Thread(target=parent)] + [threading.Thread(target=child, args=(i,)) for i in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Run Code Online (Sandbox Code Playgroud)
特别是,文档的核心消息传递模式部分讨论了这样一个事实:对于模式,任何一方都可以绑定(和另一方连接).
我认为使用 PUSH/PULL 套接字更有意义,因为您有一个标准的Ventilator - Workers - Sink场景,只不过 Ventilator 和 Sink 是相同的进程。
另外,请考虑使用多处理模块而不是 ZeroMQ。可能会容易一些。