与Python中的套接字工作分开计算

Jay*_*Jay 7 python sockets concurrency multithreading

我正在序列化列数据,然后通过套接字连接发送它.就像是:

import array, struct, socket

## Socket setup
s = socket.create_connection((ip, addr))

## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)

for i in range(num_of_chunks):
    ## Binarize data
    columns['col1'] = array.array('i', range(10000))
    columns['col2'] = array.array('f', [float(num) for num in range(10000)])
    .
    .
    .

    ## Send away
    chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
    s.sendall(chunk)
    s.recv(1000)      #get confirmation
Run Code Online (Sandbox Code Playgroud)

我希望将计算与发送分开,将它们放在单独的线程或进程上,这样我就可以在数据被发送时继续进行计算.

我把二值化部分作为生成器函数,然后将生成器发送到一个单独的线程,然后通过队列产生二进制块.

我从主线程中收集了数据并将其发送出去.就像是:

import array, struct, socket
from time import sleep
try:
    import  thread
    from Queue import Queue
except:
    import _thread as thread
    from queue import Queue


## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()


def binarize(num_of_chunks):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)

    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        .
        .

        yield b''.join((columns[col_name] for col_name in ordered_col_list))


def chunk_yielder(queue):
    ''' Generate binary chunks and put them on a queue. To be used from a thread '''

    while True:   
        try:
            data_gen = queue.get_nowait()
        except:
            sleep(0.1)
            continue
        else:    
            for chunk in data_gen:
                queue.put(chunk)


## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)


## Get data back and send away
while True:
   try:
        binary_chunk = queue.get_nowait()
    except:
        sleep(0.1)
        continue
    else:    
        socket.sendall(binary_chunk)
        socket.recv(1000) #Get confirmation
Run Code Online (Sandbox Code Playgroud)

但是,我没有看到和性能的影响 - 它没有更快的工作.

我不太了解线程/进程,我的问题是,是否有可能(在所有和Python中)从这种类型的分离中获益,以及使用线程或者线程或者什么是好的方法.处理(或任何其他方式 - 异步等).

编辑:

据我所知 -

  1. Multirpocessing需要序列化任何发送的数据,因此我将双重发送每个计算数据.
  2. 发送通道socket.send()应该释放GIL

因此,我认为(如果我弄错了,请纠正我),线程解决方案是正确的方法.但是我不确定如何正确地做到这一点.

我知道cython可以释放线程的GIL,但由于其中一个只是socket.send/recv,我的理解是它不应该是必要的.

小智 2

如果您尝试使用并发来提高 CPython 的性能,我强烈建议您使用多处理库而不是多线程。正是因为GIL(全局解释器锁),它会对执行速度产生巨大的影响(在某些情况下,它可能会导致你的代码运行得比单线程版本慢)。另外,如果您想了解有关此主题的更多信息,我建议您阅读David Beazley 的演示文稿。多处理通过为每个进程生成一个新的 Python 解释器实例来绕过这个问题,从而使您能够充分利用多核架构。