Igo*_*sky 253 python concurrency http
我正在打开一个有100,000个URL的文件.我需要向每个URL发送HTTP请求并打印状态代码.我正在使用Python 2.6,到目前为止,我看到了Python实现线程/并发的许多令人困惑的方式.我甚至看过python concurrence库,但无法弄清楚如何正确编写这个程序.有没有人遇到过类似的问题?我想通常我需要知道如何尽快在Python中执行数千个任务 - 我想这意味着'同时'.
Tar*_*mán 186
无双解决方案:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
Run Code Online (Sandbox Code Playgroud)
这个比扭曲的解决方案快一点,并且使用更少的CPU.
mhe*_*her 51
使用龙卷风异步网络库的解决方案
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)
iro*_*ggy 38
线程绝对不是这里的答案.它们将提供进程和内核瓶颈,以及如果总体目标是"最快的方式"则不能接受的吞吐量限制.
一点点twisted
,它的异步HTTP
客户端会给你更好的结果.
Gle*_*son 37
自2010年发布以来,事情发生了很大的变化,我没有尝试过所有其他答案,但我尝试了一些,我发现这对我来说最适合使用python3.6.
我能够在AWS上每秒获取大约150个独特的域名.
import pandas as pd
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
Run Code Online (Sandbox Code Playgroud)
use*_*461 24
(下一个项目的自我注释)
仅使用requests
. 它是最简单且快速的,不需要多处理或复杂的异步库。
最重要的方面是重用连接,特别是对于 HTTPS(TLS 需要额外的往返才能打开)。请注意,连接特定于子域。如果您抓取多个域上的多个页面,您可以对 URL 列表进行排序,以最大限度地提高连接重用(它有效地按域排序)。
如果有足够的线程,它将与任何异步代码一样快。(请求在等待响应时释放 python GIL)。
[带有一些日志记录和错误处理的生产级代码]
import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# source: /sf/answers/4800833271/
THREAD_POOL = 16
# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
max_retries=3,
pool_block=True)
)
def get(url):
response = session.get(url)
logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
if response.status_code != 200:
logging.error("request failed, error code %s [%s]", response.status_code, response.url)
if 500 <= response.status_code < 600:
# server is overloaded? give it a break
time.sleep(5)
return response
def download(urls):
with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
# wrap in a list() to wait for all requests to complete
for response in list(executor.map(get, urls)):
if response.status_code == 200:
print(response.content)
def main():
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
urls = [
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/404",
"https://httpstat.us/503"
]
download(urls)
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
Aks*_*ngh 16
使用grequests,它是请求+ Gevent模块的组合.
GRequests允许您使用带有Gevent的请求轻松地生成异步HTTP请求.
用法很简单:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
Run Code Online (Sandbox Code Playgroud)
创建一组未发送的请求:
>>> rs = (grequests.get(u) for u in urls)
Run Code Online (Sandbox Code Playgroud)
同时发送所有内容:
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
Run Code Online (Sandbox Code Playgroud)
解决此问题的一个好方法是首先编写获取一个结果所需的代码,然后合并线程代码以并行化应用程序.
在一个完美的世界中,这只是意味着同时启动100,000个线程,将其结果输出到字典或列表中以供以后处理,但实际上,您可以以这种方式发出多少并行HTTP请求.在本地,您可以同时打开多少个套接字,Python解释器允许执行多少个线程.远程地,如果所有请求都针对一个服务器或多个请求,则可能会限制同时连接的数量.这些限制可能需要您编写脚本,以便在任何时候只调查一小部分URL(100,正如另一张海报所提到的,可能是一个不错的线程池大小,尽管你可能会发现你可以成功部署更多).
您可以按照此设计模式解决上述问题:
list
或者dict
在CPython中,您可以安全地附加或插入线程中的唯一项而不使用锁,但如果您写入文件或需要更复杂的跨线程数据交互,则应使用互斥锁定保护这个国家免受腐败.我建议你使用线程模块.您可以使用它来启动和跟踪正在运行的线程.Python的线程支持是裸的,但对问题的描述表明它完全满足您的需求.
最后,如果你想看到用Python编写的并行网络应用的一个非常简单的应用程序,请ssh.py.它是一个小型库,它使用Python线程来并行化许多SSH连接.设计非常接近您的要求,您可能会发现它是一个很好的资源.
如果您希望获得最佳性能,可能需要考虑使用异步I/O而不是线程.与数以千计的OS线程相关的开销非常重要,Python解释器中的上下文切换在它之上增加了更多.线程肯定会完成工作,但我怀疑异步路由将提供更好的整体性能.
具体来说,我建议在Twisted库(http://www.twistedmatrix.com)中使用异步Web客户端.它有一个公认的陡峭的学习曲线,但是一旦你很好地处理了Twisted的异步编程风格,它就很容易使用.
Twisted的异步Web客户端API上的HowTo可在以下位置获得:
http://twistedmatrix.com/documents/current/web/howto/client.html
一个办法:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Run Code Online (Sandbox Code Playgroud)
原料与材料:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Run Code Online (Sandbox Code Playgroud)
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Run Code Online (Sandbox Code Playgroud)
我知道这是一个老问题,但是在Python 3.7中,您可以使用asyncio
和来实现aiohttp
。
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
Run Code Online (Sandbox Code Playgroud)
您可以阅读有关它的更多信息,并在此处查看示例。
归档时间: |
|
查看次数: |
162349 次 |
最近记录: |