hob*_*es3 6 python multithreading generator python-multithreading python-2.7
我有一个简单的代码,对我正在尝试加速的生成器中的每个项目运行GET请求:
def stream(self, records):
# type(records) = <type 'generator'>
for record in records:
# record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
output = rest_api_lookup(record[self.input_field])
record.update(output)
yield record
Run Code Online (Sandbox Code Playgroud)
现在,它在单个线程上运行并且永远需要,因为每个REST调用都会等到上一个REST调用完成.
在使用这个很棒的答案(/sf/answers/1992428651/)之前,我已经在列表中使用Python中的多线程,但是我不确定如何在生成器上重用相同的策略而不是名单.
我得到了一位开发人员的建议,他建议我将生成器分解为100个元素列表,然后关闭池,但我不知道如何从生成器创建这些列表.
我还需要保留原始订单,因为我需要yield record按正确的顺序.
我假设您不想records首先将生成器变成列表。加快处理速度的一种方法是将记录按ThreadPoolExecutor块传递。执行器将rest_api_lookup同时处理该块的所有项目。然后你只需要“分解”你的结果。这是一些运行示例代码(抱歉,它不使用类,但我希望它能显示原理):
from concurrent.futures import ThreadPoolExecutor
from time import sleep
pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores
def records():
# simulates records generator
for i in range(100):
yield {'a': i}
def rest_api_lookup(a):
# simulates REST call :)
sleep(0.1)
return {'b': -a}
def stream(records):
def update_fun(record):
output = rest_api_lookup(record['a'])
record.update(output)
return record
chunk = []
for record in records:
# submit update_fun(record) into pool, keep resulting Future
chunk.append(pool.submit(update_fun, record))
if len(chunk) == 8:
yield chunk
chunk = []
if chunk:
yield chunk
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
for f in chunk:
yield f.result() # get result from Future
# Now iterate over all results in same order as generated by records()
for result in unchunk(stream(records())):
print(result)
Run Code Online (Sandbox Code Playgroud)
哈!
更新:我sleep在模拟的 REST 调用中添加了一个,以使其更加真实。这个分块版本在我的机器上只需 1.5 秒即可完成。顺序版本需要 10 秒(正如预期的那样,100 * 0.1 秒 = 10 秒)。
| 归档时间: |
|
| 查看次数: |
608 次 |
| 最近记录: |