如何在保持订单的同时在发电机上使用螺纹?

hob*_*es3 6 python multithreading generator python-multithreading python-2.7

我有一个简单的代码,对我正在尝试加速的生成器中的每个项目运行GET请求:

def stream(self, records):
    # type(records) = <type 'generator'>
    for record in records:
        # record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
        output = rest_api_lookup(record[self.input_field])

        record.update(output)
        yield record
Run Code Online (Sandbox Code Playgroud)

现在,它在单个线程上运行并且永远需要,因为每个REST调用都会等到上一个REST调用完成.

在使用这个很棒的答案(/sf/answers/1992428651/)之前,我已经在列表中使用Python中的多线程,但是我不确定如何在生成器上重用相同的策略而不是名单.

我得到了一位开发人员的建议,他建议我将生成器分解为100个元素列表,然后关闭池,但我不知道如何从生成器创建这些列表.

我还需要保留原始订单,因为我需要yield record按正确的顺序.

dns*_*wlt 5

我假设您不想records首先将生成器变成列表。加快处理速度的一种方法是将记录按ThreadPoolExecutor块传递。执行器将rest_api_lookup同时处理该块的所有项目。然后你只需要“分解”你的结果。这是一些运行示例代码(抱歉,它不使用类,但我希望它能显示原理):

from concurrent.futures import ThreadPoolExecutor
from time import sleep

pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores

def records():
    # simulates records generator
    for i in range(100):
        yield {'a': i}


def rest_api_lookup(a):
    # simulates REST call :)
    sleep(0.1)
    return {'b': -a}


def stream(records):
    def update_fun(record):
        output = rest_api_lookup(record['a'])
        record.update(output)
        return record
    chunk = []
    for record in records:
        # submit update_fun(record) into pool, keep resulting Future
        chunk.append(pool.submit(update_fun, record))
        if len(chunk) == 8:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

def unchunk(chunk_gen):
    """Flattens a generator of Future chunks into a generator of Future results."""
    for chunk in chunk_gen:
        for f in chunk:
            yield f.result() # get result from Future

# Now iterate over all results in same order as generated by records()    
for result in unchunk(stream(records())):
    print(result)
Run Code Online (Sandbox Code Playgroud)

哈!

更新:我sleep在模拟的 REST 调用中添加了一个,以使其更加真实。这个分块版本在我的机器上只需 1.5 秒即可完成。顺序版本需要 10 秒(正如预期的那样,100 * 0.1 秒 = 10 秒)。