为什么这个用于分布式计算的Python 0MQ脚本以固定的输入大小挂起?

use*_*394 6 python parallel-processing distributed-computing multiprocessing zeromq

我最近开始学习0MQ.今天早些时候,我遇到了一个博客,使用ZeroMQ进行Python多处理.它谈到了我读到的0MQ指南中的呼吸机模式,所以我决定尝试一下.

我决定尝试让呼吸机通过0mq消息向工作人员发送大型阵列,而不是像原始代码那样只计算工人的数字产品.以下是我用于" 实验 "的代码.

如下面的评论中所述,每当我尝试将变量string_length增加到大于3MB的数字时,代码就会挂起.

典型症状:假设我们将string_length设置为4MB(即4194304),然后结果管理器可能从一个worker获取结果,然后代码暂停.htop显示2个核心没有做太多.Etherape网络流量监视器也显示lo接口上没有流量.

到目前为止,经过几个小时的环顾四周,我无法弄清楚造成这种情况的原因,并且会对这个问题的原因和解决方案提出一两个暗示.谢谢!

我在配备Intel Core CPU,8GB RAM,80GB Intel X25MG2 SSD,Python 2.7.1 +,libzmq1 2.1.10-1chl1~natty1,python-pyzmq 2.1.10-1chl1~natty1的戴尔笔记本电脑上运行Ubuntu 11.04 64bit

import time
import zmq
from multiprocessing import Process, cpu_count

np = cpu_count() 
pool_size = np
number_of_elements = 128
# Odd, why once the slen is bumped to 3MB or above, the code hangs?
string_length = 1024 * 1024 * 3

def create_inputs(nelem, slen, pb=True):
    '''
    Generates an array that contains nelem fix-sized (of slen bytes)
    random strings and an accompanying array of hexdigests of the 
    former's elements.  Both are returned in a tuple.

    :type nelem: int
    :param nelem: The desired number of elements in the to be generated
                  array.
    :type slen: int
    :param slen: The desired number of bytes of each array element.
    :type pb: bool
    :param pb: If True, displays a text progress bar during input array
               generation.
    '''
    from os import urandom
    import sys
    import hashlib

    if pb:
        if nelem <= 64:
            toolbar_width = nelem
            chunk_size = 1
        else:
            toolbar_width = 64
            chunk_size = nelem // toolbar_width
        description = '%d random strings of %d bytes. ' % (nelem, slen) 
        s = ''.join(('Generating an array of ', description, '...\n'))
        sys.stdout.write(s)
        # create an ASCII progress bar
        sys.stdout.write("[%s]" % (" " * toolbar_width))
        sys.stdout.flush()
        sys.stdout.write("\b" * (toolbar_width+1)) 
    array   = list()
    hash4a  = list()
    try:
        for i in range(nelem):
            e = urandom(int(slen))
            array.append(e)
            h = hashlib.md5()
            h.update(e)
            he = h.hexdigest()
            hash4a.append(he)
            i += 1
            if pb and i and i % chunk_size == 0:
                sys.stdout.write("-")
                sys.stdout.flush()
        if pb:
            sys.stdout.write("\n")
    except MemoryError:
        print('Memory Error: discarding existing arrays')
        array  = list()
        hash4a = list()
    finally:
        return array, hash4a

# The "ventilator" function generates an array of nelem fix-sized (of slen
# bytes long) random strings, and sends the array down a zeromq "PUSH"
# connection to be processed by listening workers, in a round robin load
# balanced fashion.

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Create the input array
    nelem = number_of_elements
    slen = string_length
    payloads = create_inputs(nelem, slen)

    # Send an array to each worker
    for num in range(np):
        work_message = { 'num' : payloads }
        ventilator_send.send_pyobj(work_message)

    time.sleep(1)

# The "worker" functions listen on a zeromq PULL connection for "work"
# (array to be processed) from the ventilator, get the length of the array
# and send the results down another zeromq PUSH connection to the results
# manager.

def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
        socks = dict(poller.poll())

        # If the message came from work_receiver channel, get the length
        # of the array and send the answer to the results reporter
        if socks.get(work_receiver) == zmq.POLLIN:
            #work_message = work_receiver.recv_json()
            work_message = work_receiver.recv_pyobj()
            length = len(work_message['num'][0])
            answer_message = { 'worker' : wrk_num, 'result' : length }
            results_sender.send_json(answer_message)

        # If the message came over the control channel, shut down the worker.
        if socks.get(control_receiver) == zmq.POLLIN:
            control_message = control_receiver.recv()
            if control_message == "FINISHED":
                print("Worker %i received FINSHED, quitting!" % wrk_num)
                break

# The "results_manager" function receives each result from multiple workers,
# and prints those results.  When all results have been received, it signals
# the worker processes to shut down.

def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(np):
        result_message = results_receiver.recv_json()
        print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

    # Signal to all workers that we are finsihed
    control_sender.send("FINISHED")
    time.sleep(5)

if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    for wrk_num in range(pool_size):
        Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager...
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()
Run Code Online (Sandbox Code Playgroud)

min*_*nrk 6

问题是您的呼吸机(PUSH)插座在完成发送之前正在关闭.您1s在呼吸机功能结束时休眠,这不足以发送384MB消息.这就是为什么你有你的门槛,如果睡眠时间较短,那么门槛会降低.

也就是说,LINGER 应该防止这种事情发生,所以我会用zeromq提出这个问题:PUSH似乎不尊重LINGER.

针对您的特定示例的修复(不添加不确定的长睡眠)将使用相同的FINISH信号来终止您的呼吸机作为您的工作人员.这样,您可以保证您的呼吸机能够在需要时存活.

修改呼吸机:

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Set up a channel to receive control messages
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Create the input array
    nelem = number_of_elements
    slen = string_length
    payloads = create_inputs(nelem, slen)

    # Send an array to each worker
    for num in range(np):
        work_message = { 'num' : payloads }
        ventilator_send.send_pyobj(work_message)

    # Poll for FINISH message, so we don't shutdown too early
    poller = zmq.Poller()
    poller.register(control_receiver, zmq.POLLIN)

    while True:
        socks = dict(poller.poll())

        if socks.get(control_receiver) == zmq.POLLIN:
            control_message = control_receiver.recv()
            if control_message == "FINISHED":
                print("Ventilator received FINSHED, quitting!")
                break
            # else: unhandled message
Run Code Online (Sandbox Code Playgroud)

  • 也许不值得用'pyzmq伙伴'来提起它,因为那基本上就是我现在.我已经把它关于libzmq,并用C编写了一个更简单的测试用例:https://gist.github.com/1643223 (3认同)