我有一个函数,它接受常规和异步函数(不是协程,而是返回协程的函数)。
它在内部使用asyncio.iscoroutinefunction() test来查看它获得了哪种类型的功能。
最近,当我尝试创建部分异步函数时,它崩溃了。
在这个演示中,ptest 不被识别为一个协程函数,即使它返回一个协程,即ptest() 是一个协程。
import asyncio
import functools
async def test(arg): pass
print(asyncio.iscoroutinefunction(test)) # True
ptest = functools.partial(test, None)
print(asyncio.iscoroutinefunction(ptest)) # False!!
print(asyncio.iscoroutine(ptest())) # True
Run Code Online (Sandbox Code Playgroud)
问题原因很清楚,但解决方案却不是。
如何动态创建通过测试的部分异步函数?
或者
如何测试包裹在部分对象中的 func ?
任何一个答案都可以解决问题。
当我在 Python 3.7 中运行此代码时:
import asyncio
sem = asyncio.Semaphore(2)
async def work():
async with sem:
print('working')
await asyncio.sleep(1)
async def main():
await asyncio.gather(work(), work(), work())
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
它因运行时错误而失败:
$ python3 demo.py
working
working
Traceback (most recent call last):
File "demo.py", line 13, in <module>
asyncio.run(main())
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "demo.py", line 11, in main
await asyncio.gather(work(), work(), work())
File "demo.py", line 6, in work
async with sem:
File …Run Code Online (Sandbox Code Playgroud) 我正在尝试获取一对多关系中匹配的行数。当我尝试时parent.children_count我得到:
sqlalchemy.exc.MissingGreenlet:greenlet_spawn尚未被调用;不能在这里调用await_only()。是否在意想不到的地方尝试了 IO?(此错误的背景位于:https ://sqlalche.me/e/14/xd2s )
我添加了expire_on_commit=False但仍然遇到相同的错误。我怎样才能解决这个问题?
import asyncio
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Relationship, Field
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
class Parent(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
children: list["Child"] = Relationship(back_populates="parent")
@property
def children_count(self):
return len(self.children)
class Child(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
parent_id: UUID = Field(default=None, foreign_key=Parent.id)
parent: "Parent" = Relationship(back_populates="children")
async def main():
engine = create_async_engine("sqlite+aiosqlite://")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with AsyncSession(engine) as …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Asyncio和aiohttp库发出一堆请求(~1000),但我遇到了一个我找不到太多信息的问题.
当我用10个网址运行这个代码时,它运行得很好.当我用100多个网址运行它时,它会中断并给我RuntimeError: Event loop is closed错误.
import asyncio
import aiohttp
@asyncio.coroutine
def get_status(url):
code = '000'
try:
res = yield from asyncio.wait_for(aiohttp.request('GET', url), 4)
code = res.status
res.close()
except Exception as e:
print(e)
print(code)
if __name__ == "__main__":
urls = ['https://google.com/'] * 100
coros = [asyncio.Task(get_status(url)) for url in urls]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coros))
loop.close()
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪可以在这里找到.
任何帮助或洞察都会非常感激,因为我现在已经敲了几个小时.显然,这表明事件循环已经关闭,应该仍然是开放的,但我不知道这是怎么可能的.
我正在努力写出一些尽可能惯用的东西来收集存储在字典中的期货的结果.
我们假设我有以下代码:
import asyncio
async def sleep(seconds):
print(f'sleeping for {seconds} seconds')
await asyncio.sleep(seconds)
print(f'finished sleeping {seconds} seconds')
async def run():
tasks = {
'4': sleep(4),
'3': sleep(3),
'2': sleep(2),
'1': sleep(1),
}
print(await gather_from_dict(tasks))
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)
我期待的输出是:
sleeping for 2 seconds
sleeping for 1 seconds
sleeping for 4 seconds
sleeping for 3 seconds
finished sleeping 1 seconds
finished sleeping 2 seconds
finished sleeping 3 seconds
finished sleeping 4 seconds
{'4': None, '3': None, '2': None, '1': None} …Run Code Online (Sandbox Code Playgroud) 我无法理解asyncio.create_task()Python 3.7 中引入的函数应该如何工作。如果我做:
import asyncio
async def helloworld():
print("Hello world from a coroutine!")
asyncio.create_task(helloworld())
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(helloworld())
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
我得到:
Hello world from a coroutine!
Hello world from a coroutine!
作为输出(即协程运行两次)。这怎么不是无限递归呢?当我使用await关键字时,我希望看到我看到的内容:
import asyncio
async def helloworld():
print("Hello world from a coroutine!")
await helloworld()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(helloworld())
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
有了这个,我得到:
Hello world from a coroutine!
Hello world from a coroutine!
Hello world from a coroutine! …
我正在使用 api,有时它会给出一些奇怪的状态代码,只需重试相同的请求就可以修复这些代码。我正在使用 aiohttp 异步地向这个 api 发出请求
我也在使用退避库来重试请求,但是似乎仍然没有重试 401 请求。
@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=11, max_time=60)
async def get_user_timeline(self, session, user_id, count, max_id, trim_user, include_rts, tweet_mode):
params = {
'user_id': user_id,
'trim_user': trim_user,
'include_rts': include_rts,
'tweet_mode': tweet_mode,
'count': count
}
if (max_id and max_id != -1):
params.update({'max_id': max_id})
headers = {
'Authorization': 'Bearer {}'.format(self.access_token)
}
users_lookup_url = "/1.1/statuses/user_timeline.json"
url = self.base_url + users_lookup_url
async with session.get(url, params=params, headers=headers) as response:
result = await response.json()
response = {
'result': result,
'status': response.status,
'headers': response.headers …Run Code Online (Sandbox Code Playgroud) 我最近偶然发现了混淆gevent和基于asyncio的代码的问题,因为当我用猴子修补它们时,某些同步库可以很好地工作gevent.monkey.patch_all()。我找到了该aiogevent库,该库似乎对实现PEP 3156有所帮助,并将asyncio事件循环替换为您选择的其他实现(在本例中为gevent)。我发现的git仓库的最后一次重大提交是在4年前。修复setup.py之后,我设法成功安装了它,但是问题是它没有通过所有测试。
这些测试之一是test_soon,它产生应该执行操作并停止循环的greenlet。该测试将永远挂起,因为loop.stop()对循环没有任何影响,该循环预计将在所有任务完成后停止。我写了两个代码片段来检查它是否在传统的协程中发生,而另一个则通过来检查gevent.spawn。
import gevent
import aiogevent
import asyncio
asyncio.set_event_loop_policy(aiogevent.EventLoopPolicy())
loop = asyncio.get_event_loop()
async def func():
print('bloop')
loop.stop()
loop.create_task(func())
loop.run_forever() # works alright and stops as soon as func finish
Run Code Online (Sandbox Code Playgroud)
和gevent.spawn:
import gevent
import aiogevent
import asyncio
asyncio.set_event_loop_policy(aiogevent.EventLoopPolicy())
loop = asyncio.get_event_loop()
def func():
print('bloop')
loop.stop()
g = gevent.spawn(func)
loop.run_forever() # func is executed as soon as loop runs, but loop.stop() is ignored
Run Code Online (Sandbox Code Playgroud)
问题是:这里可能出什么问题?我清楚地看到,在启动循环之后,greenlet会运行,但是循环是否“未跟踪”?我在异步源中找不到与该机制相对应的确切行,对于gevent来说也是如此-我对这些模块的内部并不十分熟悉,并且通过它们进行搜索令人困惑,但是我想知道有什么区别和aiogevent为了通过测试,必须对事件循环进行哪些更改。
upd1:为了强调该问题,gevent.hub.Hub …
我正在使用 aiohttp 发出异步请求,我想测试我的代码。我想模拟 aiohttp.ClientSession 发送的请求。我正在寻找类似于响应处理requestslib模拟的方式。
我如何模拟aiohttp.ClientSession?
# sample method
async def get_resource(self, session):
async with aiohttp.ClientSession() as session:
response = await self.session.get("some-external-api.com/resource")
if response.status == 200:
result = await response.json()
return result
return {...}
# I want to do something like ...
aiohttp_responses.add(
method='GET',
url="some-external-api.com/resource",
status=200,
json={"message": "this worked"}
)
async def test_get_resource(self):
result = await get_resource()
assert result == {"message": "this worked"}
Run Code Online (Sandbox Code Playgroud)
编辑
我在几个项目中使用了https://github.com/pnuckowski/aioresponses,它非常适合我的需求。
阅读asyncio 文档,我意识到我不理解一个非常基本和基本的方面:直接等待协程与等待包含在任务中的同一个协程之间的区别。
在文档示例中,对say_after协程的两次调用在没有等待时顺序运行create_task,而在包装中时并发运行create_task。所以我明白这基本上就是区别,这是一个非常重要的区别。
然而让我困惑的是,在我到处阅读的示例代码中(例如展示如何使用aiohttp),有很多地方等待(用户定义的)协程(通常在其他一些用户定义的协程中间)而没有被包裹在一个任务中,我想知道为什么会这样。确定何时应将协程包装在任务中的标准是什么?
python-asyncio ×10
python ×8
python-3.x ×5
aiohttp ×3
asynchronous ×2
gevent ×1
idioms ×1
semaphore ×1
sqlalchemy ×1
sqlite ×1
sqlmodel ×1