标签: python-asyncio

部分异步函数未被检测为异步

我有一个函数,它接受常规和异步函数(不是协程,而是返回协程的函数)。

它在内部使用asyncio.iscoroutinefunction() test来查看它获得了哪种类型的功能。

最近,当我尝试创建部分异步函数时,它崩溃了。

在这个演示中,ptest 被识别为一个协程函数,即使它返回一个协程,即ptest() 一个协程。

import asyncio
import functools

async def test(arg): pass
print(asyncio.iscoroutinefunction(test))    # True

ptest = functools.partial(test, None)
print(asyncio.iscoroutinefunction(ptest))   # False!!

print(asyncio.iscoroutine(ptest()))         # True
Run Code Online (Sandbox Code Playgroud)

问题原因很清楚,但解决方案却不是。

如何动态创建通过测试的部分异步函数?

或者

如何测试包裹在部分对象中的 func ?

任何一个答案都可以解决问题。

python python-asyncio

15
推荐指数
2
解决办法
5364
查看次数

asyncio.Semaphore RuntimeError: Task got Future 附加到不同的循环

当我在 Python 3.7 中运行此代码时:

import asyncio

sem = asyncio.Semaphore(2)

async def work():
    async with sem:
        print('working')
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(work(), work(), work())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

它因运行时错误而失败:

$ python3 demo.py
working
working
Traceback (most recent call last):
  File "demo.py", line 13, in <module>
    asyncio.run(main())
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "demo.py", line 11, in main
    await asyncio.gather(work(), work(), work())
  File "demo.py", line 6, in work
    async with sem:
  File …
Run Code Online (Sandbox Code Playgroud)

python semaphore python-3.x python-asyncio

15
推荐指数
1
解决办法
3575
查看次数

MissingGreenlet:greenlet_spawn 尚未被调用

我正在尝试获取一对多关系中匹配的行数。当我尝试时parent.children_count我得到:

sqlalchemy.exc.MissingGreenlet:greenlet_spawn尚未被调用;不能在这里调用await_only()。是否在意想不到的地方尝试了 IO?(此错误的背景位于:https ://sqlalche.me/e/14/xd2s )

我添加了expire_on_commit=False但仍然遇到相同的错误。我怎样才能解决这个问题?

import asyncio
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Relationship, Field
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

class Parent(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    children: list["Child"] = Relationship(back_populates="parent")
    @property
    def children_count(self):
        return len(self.children)

class Child(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    parent_id: UUID = Field(default=None, foreign_key=Parent.id)
    parent: "Parent" = Relationship(back_populates="children")

async def main():
    engine = create_async_engine("sqlite+aiosqlite://")
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    async with AsyncSession(engine) as …
Run Code Online (Sandbox Code Playgroud)

python sqlite sqlalchemy python-asyncio sqlmodel

15
推荐指数
1
解决办法
2万
查看次数

Asyncio RuntimeError:事件循环已关闭

我正在尝试使用Asyncio和aiohttp库发出一堆请求(~1000),但我遇到了一个我找不到太多信息的问题.

当我用10个网址运行这个代码时,它运行得很好.当我用100多个网址运行它时,它会中断并给我RuntimeError: Event loop is closed错误.

import asyncio
import aiohttp


@asyncio.coroutine
def get_status(url):
    code = '000'
    try:
        res = yield from asyncio.wait_for(aiohttp.request('GET', url), 4)
        code = res.status
        res.close()
    except Exception as e:
        print(e)
    print(code)


if __name__ == "__main__":
    urls = ['https://google.com/'] * 100
    coros = [asyncio.Task(get_status(url)) for url in urls]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(coros))
    loop.close()
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪可以在这里找到.

任何帮助或洞察都会非常感激,因为我现在已经敲了几个小时.显然,这表明事件循环已经关闭,应该仍然是开放的,但我不知道这是怎么可能的.

python python-3.x python-asyncio aiohttp

14
推荐指数
2
解决办法
1万
查看次数

习惯性地从期货的词汇中收集结果

我正在努力写出一些尽可能惯用的东西来收集存储在字典中的期货的结果.

我们假设我有以下代码:

import asyncio

async def sleep(seconds):
    print(f'sleeping for {seconds} seconds')
    await asyncio.sleep(seconds)
    print(f'finished sleeping {seconds} seconds')


async def run():
    tasks = {
        '4': sleep(4),
        '3': sleep(3),
        '2': sleep(2),
        '1': sleep(1),
    }
    print(await gather_from_dict(tasks))


if __name__ == '__main__':
     asyncio.get_event_loop().run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)

我期待的输出是:

sleeping for 2 seconds
sleeping for 1 seconds
sleeping for 4 seconds
sleeping for 3 seconds
finished sleeping 1 seconds
finished sleeping 2 seconds
finished sleeping 3 seconds
finished sleeping 4 seconds
{'4': None, '3': None, '2': None, '1': None} …
Run Code Online (Sandbox Code Playgroud)

python asynchronous idioms python-3.x python-asyncio

14
推荐指数
1
解决办法
1171
查看次数

asyncio.create_task 与 await

我无法理解asyncio.create_task()Python 3.7 中引入的函数应该如何工作。如果我做:

import asyncio

async def helloworld():
    print("Hello world from a coroutine!")
    asyncio.create_task(helloworld())

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(helloworld())

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

我得到:

Hello world from a coroutine! Hello world from a coroutine!

作为输出(即协程运行两次)。这怎么不是无限递归呢?当我使用await关键字时,我希望看到我看到的内容:

import asyncio


async def helloworld():
    print("Hello world from a coroutine!")
    await helloworld()


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(helloworld())


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

有了这个,我得到:

Hello world from a coroutine! Hello world from a coroutine! Hello world from a coroutine! …

python python-asyncio

14
推荐指数
2
解决办法
6167
查看次数

如何根据状态码重试异步 aiohttp 请求

我正在使用 api,有时它会给出一些奇怪的状态代码,只需重试相同的请求就可以修复这些代码。我正在使用 aiohttp 异步地向这个 api 发出请求

我也在使用退避库来重试请求,但是似乎仍然没有重试 401 请求。

   @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=11, max_time=60)
    async def get_user_timeline(self, session, user_id, count, max_id, trim_user, include_rts, tweet_mode):

        params = {
            'user_id': user_id,
            'trim_user': trim_user,
            'include_rts': include_rts,
            'tweet_mode': tweet_mode,
            'count': count
        }


        if (max_id and max_id != -1):
            params.update({'max_id': max_id})

        headers = {
            'Authorization': 'Bearer {}'.format(self.access_token)    
        }

        users_lookup_url = "/1.1/statuses/user_timeline.json"

        url = self.base_url + users_lookup_url

        async with session.get(url, params=params, headers=headers) as response:
            result = await response.json()
            response = {
                'result': result,
                'status': response.status,
                'headers': response.headers …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio aiohttp

14
推荐指数
2
解决办法
1万
查看次数

aiogevent事件循环“失败”以跟踪greenlet

我最近偶然发现了混淆gevent和基于asyncio的代码的问题,因为当我用猴子修补它们时,某些同步库可以很好地工作gevent.monkey.patch_all()。我找到了该aiogevent库,该库似乎对实现PEP 3156有所帮助,并将asyncio事件循环替换为您选择的其他实现(在本例中为gevent)。我发现的git仓库的最后一次重大提交是在4年前。修复setup.py之后,我设法成功安装了它,但是问题是它没有通过所有测试。

这些测试之一是test_soon,它产生应该执行操作并停止循环的greenlet。该测试将永远挂起,因为loop.stop()对循环没有任何影响,该循环预计将在所有任务完成后停止。我写了两个代码片段来检查它是否在传统的协程中发生,而另一个则通过来检查gevent.spawn

import gevent
import aiogevent
import asyncio

asyncio.set_event_loop_policy(aiogevent.EventLoopPolicy())

loop = asyncio.get_event_loop()

async def func():
    print('bloop')
    loop.stop()

loop.create_task(func())
loop.run_forever() # works alright and stops as soon as func finish
Run Code Online (Sandbox Code Playgroud)

gevent.spawn

import gevent
import aiogevent
import asyncio

asyncio.set_event_loop_policy(aiogevent.EventLoopPolicy())

loop = asyncio.get_event_loop()

def func():
    print('bloop')
    loop.stop()

g = gevent.spawn(func)
loop.run_forever() # func is executed as soon as loop runs, but loop.stop() is ignored
Run Code Online (Sandbox Code Playgroud)

问题是:这里可能出什么问题?我清楚地看到,在启动循环之后,greenlet会运行,但是循环是否“未跟踪”?我在异步源中找不到与该机制相对应的确切行,对于gevent来说也是如此-我对这些模块的内部并不十分熟悉,并且通过它们进行搜索令人困惑,但是我想知道有什么区别和aiogevent为了通过测试,必须对事件循环进行哪些更改。

upd1:为了强调该问题,gevent.hub.Hub …

python asynchronous gevent python-3.x python-asyncio

14
推荐指数
1
解决办法
275
查看次数

如何模拟 aiohttp.ClientSession 做出的响应?

我正在使用 aiohttp 发出异步请求,我想测试我的代码。我想模拟 aiohttp.ClientSession 发送的请求。我正在寻找类似于响应处理requestslib模拟的方式。

我如何模拟aiohttp.ClientSession?

# sample method
async def get_resource(self, session):
    async with aiohttp.ClientSession() as session:
        response = await self.session.get("some-external-api.com/resource")
        if response.status == 200:
            result = await response.json()
            return result

        return {...}

# I want to do something like ...
aiohttp_responses.add(
    method='GET', 
    url="some-external-api.com/resource", 
    status=200, 
    json={"message": "this worked"}
)

async def test_get_resource(self):
    result = await get_resource()
    assert result == {"message": "this worked"}
Run Code Online (Sandbox Code Playgroud)
  • 我已经通读了aiohttp 测试文档。似乎他们涵盖了模拟对您的 Web 服务器的传入请求,但我不确定这是否有助于我模拟对传出请求的响应

编辑

我在几个项目中使用了https://github.com/pnuckowski/aioresponses,它非常适合我的需求。

python-asyncio aiohttp pytest-aiohttp

14
推荐指数
2
解决办法
4794
查看次数

异步任务与协程

阅读asyncio 文档,我意识到我不理解一个非常基本和基本的方面:直接等待协程与等待包含在任务中的同一个协程之间的区别。

在文档示例中,对say_after协程的两次调用在没有等待时顺序运行create_task,而在包装中时并发运行create_task。所以我明白这基本上就是区别,这是一个非常重要的区别。

然而让我困惑的是,在我到处阅读的示例代码中(例如展示如何使用aiohttp),有很多地方等待(用户定义的)协程(通常在其他一些用户定义的协程中间)而没有被包裹在一个任务中,我想知道为什么会这样。确定何时应将协程包装在任务中的标准是什么?

python-asyncio

14
推荐指数
1
解决办法
2254
查看次数