mpt*_*ion 14 python concurrency multithreading asynchronous python-multiprocessing
我通过HTTP连接到本地服务器(OSRM)以提交路由并返回驱动时间.我注意到I/O比线程慢,因为似乎计算的等待时间小于发送请求和处理JSON输出所花费的时间(我认为当服务器需要一些时间时I/O更好处理你的请求 - >你不希望它被阻止,因为你必须等待,这不是我的情况).线程受全局解释器锁的影响,因此我看来(以及下面的证据)我最快的选择是使用多处理.
多处理的问题是它太快以至于耗尽了我的套接字并且我收到了一个错误(请求每次都发出一个新的连接).我可以(在串行中)使用requests.Sessions()对象来保持连接活动,但是我不能让它并行工作(每个进程都有它自己的会话).
我目前最接近的代码就是这个多处理代码:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
但是,我无法让HTTPConnectionPool正常工作,每次都会创建新的套接字(我认为),然后给我错误:
HTTPConnectionPool(host ='127.0.0.1',port = 5005):使用url超出了最大重试次数:/viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false(由NewConnectionError引起(':无法建立新连接:[WinError 10048]通常只允许使用每个套接字地址(协议/网络地址/端口)',))
我的目标是从我在本地运行的OSRM路由服务器(尽可能快)进行距离计算.
我有两个部分的问题 - 基本上我试图使用multiprocessing.Pool()转换一些代码以更好的代码(适当的异步函数 - 以便执行永远不会中断并且它运行得尽可能快).
我遇到的问题是我尝试的所有内容似乎都比多处理慢(我在下面提供了几个我尝试过的例子).
一些可能的方法是:gevents,grequests,tornado,requests-futures,asyncio等.
A - Multiprocessing.Pool()
我最初开始时是这样的:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
Run Code Online (Sandbox Code Playgroud)
我连接到本地服务器(localhost,端口:5005)的地方,该服务器在8个线程上启动并支持并行执行.
经过一些搜索后,我意识到我得到的错误是因为请求为每个请求打开了一个新的连接/套接字.所以这实际上是太快了,一段时间后耗尽了套接字.似乎解决这个问题的方法是使用requests.Session() - 但是我无法使用多处理(每个进程都有自己的会话).
问题1.
在某些计算机上运行正常,例如:
要比较以后:45%服务器使用率和1700个每秒请求数
但是,有些人没有,我不完全理解为什么:
HTTPConnectionPool(host ='127.0.0.1',port = 5000):使用url超出了最大重试次数:/viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false(由NewConnectionError引起(':无法建立新连接:[WinError 10048]通常只允许使用每个套接字地址(协议/网络地址/端口)',))
我的猜测是,因为请求在使用时锁定套接字 - 有时服务器太慢而无法响应旧请求并生成新请求.服务器支持排队,但请求不是这样,而不是添加到队列我得到错误?
问题2.
我发现:
阻止还是不阻止?
使用默认传输适配器,请求不提供任何类型的非阻塞IO.Response.content属性将阻塞,直到下载完整个响应.如果您需要更多粒度,则库的流式传输功能(请参阅流式传输请求)允许您一次检索较小数量的响应.但是,这些调用仍会阻止.
如果您担心使用阻塞IO,那么有很多项目将Requests与Python的异步框架结合起来.
两个很好的例子是grequests和requests-futures.
B - 请求 - 期货
为了解决这个问题,我需要重写我的代码以使用异步请求,所以我尝试使用以下方法:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
Run Code Online (Sandbox Code Playgroud)
(顺便说一句,我使用所有线程选项启动我的服务器)
主要代码:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
Run Code Online (Sandbox Code Playgroud)
我的函数(ReqOsrm)现在被重写为:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
Run Code Online (Sandbox Code Playgroud)
但是,此代码比多处理代码慢!在我每秒约1700个请求之前,现在我得到600秒.我想这是因为我没有完整的CPU利用率,但是我不知道如何增加它?
C - 线程
我尝试了另一种方法(创建线程) - 但是再次不确定如何最大化CPU使用率(理想情况下我希望看到我的服务器使用50%,不是吗?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
Run Code Online (Sandbox Code Playgroud)
这个方法比我想的requests_futures要快,但我不知道设置多少个线程来最大化这个 -
D - 龙卷风(不工作)
我现在正在尝试龙卷风 - 但是如果我使用curl,如果我使用simple_httpclient它可以工作但是我得到超时错误,那么它无法正常工作它打破现有代码-1073741819
错误:tornado.application:产量列表中的多个异常Traceback(最近一次调用最后一次):文件"C:\ Anaconda3\lib\site-packages\tornado\gen.py",第789行,在回调result_list.append(f. result())文件"C:\ Anaconda3\lib\site-packages\tornado\concurrent.py",第232行,结果为raise_exc_info(self._exc_info)文件"",第3行,在raise_exc_info tornado.httpclient.HTTPError: HTTP 599:超时
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
Run Code Online (Sandbox Code Playgroud)
E - asyncio/aiohttp
决定使用asyncio和aiohttp尝试另一种方法(尽管龙卷风很棒).
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
Run Code Online (Sandbox Code Playgroud)
这工作正常,但仍然比多处理慢!
谢谢大家的帮助.我想发表我的结论:
由于我的HTTP请求是针对立即处理请求的本地服务器,因此使用异步方法对我来说没有多大意义(与通过Internet发送请求的大多数情况相比).对我来说,代价高昂的因素实际上就是发送请求并处理反馈,这意味着我可以使用多个进程获得更好的速度(线程受GIL影响).我还应该使用会话来提高速度(不需要关闭并重新打开与SAME服务器的连接)并帮助防止端口耗尽.
以下是使用示例RPS尝试(工作)的所有方法:
串行
S1.串行GET请求(无会话) - > 215 RPS
def ReqOsrm(data):
url, qid = data
try:
response = requests.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
Run Code Online (Sandbox Code Playgroud)
S2.串行GET请求(requests.Session()) - > 335 RPS
session = requests.Session()
def ReqOsrm(data):
url, qid = data
try:
response = session.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
Run Code Online (Sandbox Code Playgroud)
S3.串行GET请求(urllib3.HTTPConnectionPool) - > 545 RPS
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
Run Code Online (Sandbox Code Playgroud)
异步IO
A4.AsyncIO与aiohttp - > 450 RPS
import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
with aiohttp.ClientSession() as session:
response = yield from session.get(url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(num_chunks=concurrent)
# http_client returns futures, save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
Run Code Online (Sandbox Code Playgroud)
A5.没有会话的线程 - > 330 RPS
from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
Run Code Online (Sandbox Code Playgroud)
A6.使用HTTPConnectionPool进行线程处理 - > 1550 RPS
from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = conn_pool.request('GET', url)
return resp.status, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
Run Code Online (Sandbox Code Playgroud)
A7.请求 - 期货 - > 520 RPS
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
try:
json_geocode = resp.json()
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err)
out = [999, 0, 0]
resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0]
calc_routes.append(row)
Run Code Online (Sandbox Code Playgroud)
多个进程
P8.multiprocessing.worker + queue + requests.session() - > 1058 RPS
from multiprocessing import *
class Worker(Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.qin = qin
self.qout = qout
def run(self):
s = requests.session()
while not self.qin.empty():
url, qid = self.qin.get()
data = s.get(url)
self.qout.put(ReqOsrm(data, qid))
self.qin.task_done()
def ReqOsrm(resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid)
return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
calc_routes.append(qout.get())
Run Code Online (Sandbox Code Playgroud)
P9.multiprocessing.worker + queue + HTTPConnectionPool() - > 1230 RPS
P10.多处理v2(不确定这是多么不同) - > 1350 RPS
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)
Run Code Online (Sandbox Code Playgroud)
总而言之,似乎对我来说最好的方法是#10(令人惊讶的是#6)
查看问题顶部的多处理代码。似乎HttpConnectionPool()每次调用 ReqOsrm 时都会调用 a 。因此,为每个 url 创建一个新池。相反,请使用initializer和args参数为每个进程创建一个池。
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
if __name__ == "__main__":
# run:
pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
request-futures 版本似乎存在缩进错误。该循环
for future in as_completed(futures):在外循环下方缩进
for i in range(len(url_routes)):。因此,在外循环中发出请求,然后内循环在外循环的下一次迭代之前等待该 future 返回。这使得请求串行运行而不是并行运行。
我认为代码应该如下:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit all the requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# this was indented under the code in section B of the question
# process the futures as they become copmlete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
print(row)
calc_routes.append(row)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2431 次 |
| 最近记录: |