我用的很asyncio漂亮aiohttp.主要的想法是我向服务器发出请求(它返回链接),然后我想从所有链接并行下载文件(类似于一个示例).
码:
import aiohttp
import asyncio
@asyncio.coroutine
def downloader(file):
print('Download', file['title'])
yield from asyncio.sleep(1.0) # some actions to download
print('OK', file['title'])
def run():
r = yield from aiohttp.request('get', 'my_url.com', True))
raw = yield from r.json()
tasks = []
for file in raw['files']:
tasks.append(asyncio.async(downloader(file)))
asyncio.wait(tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试运行它时,我有许多"下载..."输出和
Task was destroyed but it is pending!
Run Code Online (Sandbox Code Playgroud)
而且没有'OK + filename'.
我该如何解决这个问题?
我想使用龙卷风和asyncio库,如aiohttp和本机python 3.5协同程序,它似乎在最新的龙卷风版本(4.3)中得到支持.但是,在龙卷风事件循环中使用它时,请求处理程序将无限期挂起.当不使用aiohttp(即没有行r = await aiohttp.get('http://google.com/')和text = await r.text()下面)时,请求处理程序正常进行.
我的测试代码如下:
from tornado.ioloop import IOLoop
import tornado.web
import tornado.httpserver
import aiohttp
IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
class MainHandler(tornado.web.RequestHandler):
async def get(self):
r = await aiohttp.get('http://google.com/')
text = await r.text()
self.write("Hello, world, text is: {}".format(text))
if __name__ == "__main__":
app = tornado.web.Application([
(r"/", MainHandler),
])
server = tornado.httpserver.HTTPServer(app)
server.bind(8888, '127.0.0.1')
server.start()
IOLoop.current().start()
Run Code Online (Sandbox Code Playgroud) 我正在试验limit和limit_per_host参数aiohttp.connector.TCPConnector.
在下面的脚本中,我connector = aiohttp.connector.TCPConnector(limit=25, limit_per_host=5)转到aiohttp.ClientSession,然后打开2个请求到docs.aiohttp.org和3到github.com.
结果session.request是一个实例aiohttp.ClientResponse,在这个例子中我故意不.close()通过.close()或者调用它__aexit__.我认为这会使连接池保持打开状态,并减少与(host,ssl,port)三倍的可用连接数-1.
下表表示._available_connections()每个请求之后. 即使在完成对docs.aiohttp.org的第二次请求后,为什么数字仍为4? 这两个连接可能仍然是开放的,尚未访问._content或已关闭.可用连接不应减少1吗?
After Request Num. To _available_connections
1 docs.aiohttp.org 4
2 docs.aiohttp.org 4 <--- Why?
3 github.com 4
4 github.com 3
5 github.com 2
Run Code Online (Sandbox Code Playgroud)
此外,为什么._acquired_per_host只包含1个键? 我想我可能会理解的方法TCPConnector; 是什么解释了上面的行为?
完整脚本:
import aiohttp
async def main():
connector = aiohttp.connector.TCPConnector(limit=25, limit_per_host=5)
print("Connector arguments:")
print("_limit:", connector._limit)
print("_limit_per_host:", connector._limit_per_host)
print("-" …Run Code Online (Sandbox Code Playgroud) I'm trying to make an asynchronous web scraper using beautifulsoup and aiohttp.This is my initial code to start things.I'm getting a [TypeError: An asyncio.Future, a coroutine or an awaitable is required] and having a hard time figuring out what is wrong with my code.I am new to python and would appreciate any help regarding this.
import bs4
import asyncio
import aiohttp
async def parse(page):
soup=bs4.BeautifulSoup(page,'html.parser')
soup.prettify()
print(soup.title)
async def request():
async with aiohttp.ClientSession() as session:
async with session.get("https://google.com") as resp: …Run Code Online (Sandbox Code Playgroud) 当我使用 时aiohttp.web.run_app(. . ., port=0),我假设它选择了一个任意可用的端口来提供服务。这样对吗?如果是这样,有没有办法弄清楚它选择了哪个端口?
我写了这段代码:
import asyncio
import threading
from aiohttp import ClientSession
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
response = await response.read()
print(threading.current_thread().name)
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(fetch("http://example.com")) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
Run Code Online (Sandbox Code Playgroud)
它每次打印出"MainThread".这是否意味着所有请求都是使用相同的主线程并发执行的,并且它不是使用线程池中的线程来执行每个请求,还是被抽象的线程?
这篇文章似乎表明,事实上有一个Python用于这些异步任务的线程池:http: //skipperkongen.dk/2016/09/09/easy-parallel-http-requests-with-python-and-asyncio /
简单示例:我需要并行创建两个不相关的HTTP请求.最简单的方法是什么?我希望它是这样的:
async def do_the_job():
with aiohttp.ClientSession() as session:
coro_1 = session.get('http://httpbin.org/get')
coro_2 = session.get('http://httpbin.org/ip')
return combine_responses(await coro_1, await coro_2)
Run Code Online (Sandbox Code Playgroud)
换句话说,我想启动IO操作并等待其结果,以便它们有效地并行运行.这可以通过以下方式实现asyncio.gather:
async def do_the_job():
with aiohttp.ClientSession() as session:
coro_1 = session.get('http://example.com/get')
coro_2 = session.get('http://example.org/tp')
return combine_responses(*(await asyncio.gather(coro_1, coro_2)))
Run Code Online (Sandbox Code Playgroud)
接下来,我想要一些复杂的依赖结构.我希望在我拥有所有先决条件时开始操作,并在需要结果时获得结果.这里有助于asyncio.ensure_future分别从事件循环管理的协程中执行单独的任务:
async def do_the_job():
with aiohttp.ClientSession() as session:
fut_1 = asyncio.ensure_future(session.get('http://httpbin.org/ip'))
coro_2 = session.get('http://httpbin.org/get')
coro_3 = session.post('http://httpbin.org/post', data=(await coro_2)
coro_3_result = await coro_3
return combine_responses(await fut_1, coro_3_result)
Run Code Online (Sandbox Code Playgroud)
是不是在我的逻辑流程中使用协同程序实现并行非阻塞IO,我必须使用asyncio.ensure_future或者asyncio.gather(实际使用asyncio.ensure_future)?是否有一种不那么"冗长"的方式?
通常开发人员必须考虑协同应该成为单独的任务并使用上述功能来获得最佳性能,这是真的吗?
在事件循环中使用没有多个任务的协同程序是否有意义?
现实生活中的事件循环任务有多"重"?当然,它们比OS线程或进程"更轻".我应该在多大程度上争取尽可能少的此类任务?
我尝试重用 HTTP 会话作为 aiohttp 文档建议
不要为每个请求创建一个会话。很可能您需要为每个应用程序创建一个会话来共同执行所有请求。
但是我与请求库一起使用的通常模式不起作用:
def __init__(self):
self.session = aiohttp.ClientSession()
async def get_u(self, id):
async with self.session.get('url') as resp:
json_resp = await resp.json()
return json_resp.get('data', {})
Run Code Online (Sandbox Code Playgroud)
然后我尝试
await client.get_u(1)
Run Code Online (Sandbox Code Playgroud)
我有错误
RuntimeError: Timeout context manager should be used inside a task
Run Code Online (Sandbox Code Playgroud)
任何 async_timeout 的解决方法都没有帮助。
另一种方法是工作:
async def get_u(self, id):
async with aiohttp.ClientSession() as session:
with async_timeout.timeout(3):
async with session.get('url') as resp:
json_resp = await resp.json()
return json_resp.get('data', {})
Run Code Online (Sandbox Code Playgroud)
但似乎每个请求都创建会话。
所以我的问题是:如何正确重用 aiohttp-session?
UPD:最小工作示例。具有以下视图的 Sanic 应用程序
import aiohttp
from sanic.views import …Run Code Online (Sandbox Code Playgroud) 我被一个似乎与asyncio+ 相关的问题困扰,aiohttp因此,当发送大量并发GET请求时,超过85%的请求引发了一个aiohttp.client_exceptions.ClientConnectorError异常,最终源于
socket.gaierror(8, 'nodename nor servname provided, or not known')
Run Code Online (Sandbox Code Playgroud)
在主机/端口上发送单个GET请求或执行基础DNS解析时,不会引发此异常.
虽然在我的真实代码中,我正在进行大量自定义,例如使用自定义TCPConnector实例,但我可以仅使用"默认" aiohttp类实例和参数来重现问题,如下所示.
我跟踪了回溯,异常的根与DNS解析有关.它来自于调用的_create_direct_connection方法.aiohttp.TCPConnector._resolve_host()
我也尝试过:
aiodnssudo killall -HUP mDNSResponderfamily=socket.AF_INET作为参数TCPConnector(虽然我很确定aiodns无论如何都使用它).这使用2而不是默认的int 0到该参数ssl=True和ssl=False一切都无济于事.
完整的代码重现如下.输入的URL位于https://gist.github.com/bsolomon1124/fc625b624dd26ad9b5c39ccb9e230f5a.
import asyncio
import itertools
import aiohttp
import aiohttp.client_exceptions
from yarl import URL
ua = itertools.cycle(
(
"Mozilla/5.0 (X11; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0",
"Mozilla/5.0 (Macintosh; U; Intel …Run Code Online (Sandbox Code Playgroud) 我编写了一个Python 3.7脚本,该脚本使用单个语句查询的多个对象异步(asyncio 3.4.3 and aiohttp 3.5.4)创建Salesforce批量 API(v45.0)作业/批处理SOQL,等待批处理完成,完成后将结果下载(流式传输)到服务器,进行一些数据转换,以及然后最后将结果同步上传到SQL Server 2016 SP1 (13.0.4560.0). 我已经进行了很多成功的试运行,并认为它运行良好,但是,我最近开始间歇性地收到以下错误,并且对如何修复感到不知所措,因为很少有关于此的报告/解决方案在网上:
aiohttp.client_exceptions.ClientPayloadError:响应负载未完成
示例代码片段:
import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree
#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
password=password,
security_token=securityToken,
organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'
#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary =
{'Account': {'job':
{'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
'id': '8752R00000iUjtReqS'},
'soql': 'select …Run Code Online (Sandbox Code Playgroud) aiohttp ×10
python ×9
python-3.x ×5
asynchronous ×2
async-await ×1
salesforce ×1
sanic ×1
tornado ×1