在Python中发送100,000个HTTP请求的最快方法是什么?

Igo*_*sky 253 python concurrency http

我正在打开一个有100,000个URL的文件.我需要向每个URL发送HTTP请求并打印状态代码.我正在使用Python 2.6,到目前为止,我看到了Python实现线程/并发的许多令人困惑的方式.我甚至看过python concurrence库,但无法弄清楚如何正确编写这个程序.有没有人遇到过类似的问题?我想通常我需要知道如何尽快在Python中执行数千个任务 - 我想这意味着'同时'.

Tar*_*mán 186

无双解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)
Run Code Online (Sandbox Code Playgroud)

这个比扭曲的解决方案快一点,并且使用更少的CPU.

  • @Kalmi,为什么要将Queue设置为`concurrent*2`? (9认同)
  • 不要忘记[关闭连接](http://docs.python.org/2/library/httplib.html#httplib.HTTPConnection.close)`conn.close()`.打开太多的http连接可能会在某些时候停止脚本并占用内存. (7认同)
  • @hyh,`Queue`模块已在Python 3中重命名为`queue`.这是Python 2代码. (4认同)
  • 如果你想通过持久连接想要每次与SAME服务器通话,你能走多快?这甚至可以跨线程完成,还是每个线程有一个持久连接? (3认同)
  • @mptevsion,如果您正在使用CPython,您可以(例如)将"print status,url"替换为"my_global_list.append((status,url))".由于GIL,(大多数操作)列表在CPython(以及其他一些python实现)中是隐式线程安全的,所以这样做是安全的. (2认同)

mhe*_*her 51

使用龙卷风异步网络库的解决方案

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)

  • 此代码使用非阻塞网络I/O,没有任何限制.它可以扩展到成千上万的开放连接.它将在单个线程中运行,但将比任何线程解决方案更快.检查非阻塞I/O https://en.wikipedia.org/wiki/Asynchronous_I/O (6认同)
  • 它是一个计数器,用于确定何时退出``ioloop` - 所以当你完成时. (4认同)
  • @Guy Avraham祝你的ddos计划获得帮助. (4认同)
  • @mher我测试了你的代码,但我只得到 599 响应代码。不知道哪里可能出问题吗?谢谢 (2认同)
  • @mher - 如果我对响应根本不感兴趣,这意味着只想尽可能快地向服务器发送尽可能多的请求,我应该在上面的示例中修改什么(如果有)?谢谢 !! (2认同)
  • @Walter - 你明白了 :) 其实我是在做一些非常幼稚的“压力测试” (2认同)

iro*_*ggy 38

线程绝对不是这里的答案.它们将提供进程和内核瓶颈,以及如果总体目标是"最快的方式"则不能接受的吞吐量限制.

一点点twisted,它的异步HTTP客户端会给你更好的结果.

  • 您可以避免使用100k内容填充队列.只需从输入中一次处理一个项目,然后启动一个线程来处理与每个项目相对应的请求.(如下所述,当线程数低于某个阈值时,使用启动程序线程启动HTTP请求线程.让线程将结果写入dict映射URL以响应,或将元组附加到列表.) (2认同)

Gle*_*son 37

自2010年发布以来,事情发生了很大的变化,我没有尝试过所有其他答案,但我尝试了一些,我发现这对我来说最适合使用python3.6.

我能够在AWS上每秒获取大约150个独特的域名.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
Run Code Online (Sandbox Code Playgroud)

  • 我只是问,因为我不知道,但是这个 futures 的东西可以用 async/await 代替吗? (2认同)
  • 可以,但我发现上述方法效果更好。您可以使用 aiohttp,但它不是标准库的一部分,并且正在发生很大变化。它确实有效,但我只是没有发现它也有效。当我使用它时,我得到了更高的错误率,并且在我的一生中我无法让它像并发期货一样工作,尽管理论上它似乎应该工作得更好,请参阅:https://stackoverflow.com/questions /45800857/aiohttp-error-rate-increases-with-number-of-connections 如果您让它运行良好,请发布您的答案,以便我可以对其进行测试。 (2认同)

use*_*461 24

(下一个项目的自我注释)

仅使用requests. 它是最简单且快速的,不需要多处理或复杂的异步库。

最重要的方面是重用连接,特别是对于 HTTPS(TLS 需要额外的往返才能打开)。请注意,连接特定于子域​​。如果您抓取多个域上的多个页面,您可以对 URL 列表进行排序,以最大限度地提高连接重用(它有效地按域排序)。

如果有足够的线程,它将与任何异步代码一样快。(请求在等待响应时释放 python GIL)。

[带有一些日志记录和错误处理的生产级代码]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# source: /sf/answers/4800833271/

THREAD_POOL = 16

# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
    'https://',
    requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
                                  max_retries=3,
                                  pool_block=True)
)

def get(url):
    response = session.get(url)
    logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
    if response.status_code != 200:
        logging.error("request failed, error code %s [%s]", response.status_code, response.url)
    if 500 <= response.status_code < 600:
        # server is overloaded? give it a break
        time.sleep(5)
    return response

def download(urls):
    with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
        # wrap in a list() to wait for all requests to complete
        for response in list(executor.map(get, urls)):
            if response.status_code == 200:
                print(response.content)

def main():
    logging.basicConfig(
        format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/404",
        "https://httpstat.us/503"
    ]

    download(urls)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)


Aks*_*ngh 16

使用grequests,它是请求+ Gevent模块的组合.

GRequests允许您使用带有Gevent的请求轻松地生成异步HTTP请求.

用法很简单:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]
Run Code Online (Sandbox Code Playgroud)

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)
Run Code Online (Sandbox Code Playgroud)

同时发送所有内容:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
Run Code Online (Sandbox Code Playgroud)

  • grequests不是正常请求的一部分,似乎很大程度上没有维护 (13认同)
  • gevent现在支持python 3 (6认同)

Eri*_*son 8

解决此问题的一个好方法是首先编写获取一个结果所需的代码,然后合并线程代码以并行化应用程序.

在一个完美的世界中,这只是意味着同时启动100,000个线程,将其结果输出到字典或列表中以供以后处理,但实际上,您可以以这种方式发出多少并行HTTP请求.在本地,您可以同时打开多少个套接字,Python解释器允许执行多少个线程.远程地,如果所有请求都针对一个服务器或多个请求,则可能会限制同时连接的数量.这些限制可能需要您编写脚本,以便在任何时候只调查一小部分URL(100,正如另一张海报所提到的,可能是一个不错的线程池大小,尽管你可能会发现你可以成功部署更多).

您可以按照此设计模式解决上述问题:

  1. 启动一个启动新请求线程的线程,直到当前正在运行的线程数(您可以通过threading.active_count()或通过将线程对象推送到数据结构中来跟踪它们)> =您的最大并发请求数(比如100) ,然后睡一会儿.当没有更多URL要处理时,该线程应该终止.因此,线程将继续唤醒,启动新线程,并在完成之前休眠.
  2. 让请求线程将其结果存储在某些数据结构中,以便以后检索和输出.如果要存储结果的结构是CPython,list或者dict在CPython中,您可以安全地附加或插入线程中的唯一项而不使用锁,但如果您写入文件或需要更复杂的跨线程数据交互,则应使用互斥锁定保护这个国家免受腐败.

我建议你使用线程模块.您可以使用它来启动和跟踪正在运行的线程.Python的线程支持是裸的,但对问题的描述表明它完全满足您的需求.

最后,如果你想看到用Python编写的并行网络应用的一个非常简单的应用程序,请ssh.py.它是一个小型库,它使用Python线程来并行化许多SSH连接.设计非常接近您的要求,您可能会发现它是一个很好的资源.


Rak*_*kis 7

如果您希望获得最佳性能,可能需要考虑使用异步I/O而不是线程.与数以千计的OS线程相关的开销非常重要,Python解释器中的上下文切换在它之上增加了更多.线程肯定会完成工作,但我怀疑异步路由将提供更好的整体性能.

具体来说,我建议在Twisted库(http://www.twistedmatrix.com)中使用异步Web客户端.它有一个公认的陡峭的学习曲线,但是一旦你很好地处理了Twisted的异步编程风格,它就很容易使用.

Twisted的异步Web客户端API上的HowTo可在以下位置获得:

http://twistedmatrix.com/documents/current/web/howto/client.html


Tar*_*mán 5

一个办法:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()
Run Code Online (Sandbox Code Playgroud)

原料与材料:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Run Code Online (Sandbox Code Playgroud)

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Run Code Online (Sandbox Code Playgroud)

  • 使用Twisted作为线程池忽略了您可以从中获得的大部分好处.您应该使用异步HTTP客户端. (6认同)

Mar*_*scu 5

我知道这是一个老问题,但是在Python 3.7中,您可以使用asyncio和来实现aiohttp

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))
Run Code Online (Sandbox Code Playgroud)

您可以阅读有关它的更多信息,并在此处查看示例。