标签: python-asyncio

在python 3.5中模拟异步调用

如何模拟从一个本地协程到另一个使用的异步调用unittest.mock.patch

我目前有一个很尴尬的解决方案:

class CoroutineMock(MagicMock):
    def __await__(self, *args, **kwargs):
        future = Future()
        future.set_result(self)
        result = yield from future
        return result
Run Code Online (Sandbox Code Playgroud)

然后

class TestCoroutines(TestCase):
    @patch('some.path', new_callable=CoroutineMock)
    def test(self, mock):
        some_action()
        mock.assert_called_with(1,2,3)
Run Code Online (Sandbox Code Playgroud)

这有效,但看起来很难看.是否有更多的pythonic方式来做到这一点?

python python-mock python-asyncio

32
推荐指数
7
解决办法
2万
查看次数

Asyncio事件循环已关闭

尝试运行docs中给出的asyncio hello world代码示例时:

import asyncio

async def hello_world():
    print("Hello World!")

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()
Run Code Online (Sandbox Code Playgroud)

我收到错误:

RuntimeError: Event loop is closed
Run Code Online (Sandbox Code Playgroud)

我使用的是python 3.5.3.

python python-asyncio

32
推荐指数
3
解决办法
2万
查看次数

loop.create_task,asyncio.async/ensure_future和Task之间有什么区别?

我对某些asyncio功能感到有些困惑.我看到有BaseEventLoop.create_task(coro)安排合作例程的功能.文档create_task说它是一个新函数和兼容性,我们应该asyncio.async(coro)通过再次引用docs来看到它是一个别名,asyncio.ensure_future(coro)它再次调度一个协同例程的执行.

与此同时,我一直在Task(coro)用于调度协同例程执行,而且似乎也工作得很好.那么,这些之间的区别是什么?

python coroutine python-3.x python-asyncio

31
推荐指数
1
解决办法
7236
查看次数

SqlAlchemy asyncio orm:如何查询数据库

在 SqlAlchemy 异步 orm 引擎中,如何查询表并获取一个值或全部值?

我从非异步方法知道我可以这样做

SESSION.query(TableClass).get(x)

但尝试使用异步方法会引发下一个错误:

AttributeError: 'AsyncSession' object has no attribute 'query'.

这是我定义的 SESSION 变量。LOOP 变量仅asyncio.get_event_loop()用于在加载 sql 模块时启动异步方法并填充用作缓存的变量,以避免每次需要时都缓存数据库:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION …
Run Code Online (Sandbox Code Playgroud)

sqlalchemy python-3.x python-asyncio

31
推荐指数
2
解决办法
5万
查看次数

如何在异步协程中包装同步函数?

我正在使用aiohttp构建一个API服务器,将TCP请求发送到单独的服务器.发送TCP请求的模块是同步的,并且是出于我的目的的黑盒子.所以我的问题是这些请求阻止了整个API.我需要一种方法将模块请求包装在异步协程中,该协程不会阻止API的其余部分.

所以,仅仅使用sleep一个简单的例子,有没有办法以某种方式将耗时的同步代码包装在非阻塞协程中,如下所示:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'
Run Code Online (Sandbox Code Playgroud)

python asynchronous python-3.x python-asyncio aiohttp

30
推荐指数
5
解决办法
1万
查看次数

来自Flask路线的Python3 Asyncio调用

我想在每次执行flask路径时执行异步函数.目前我的abar函数从未执行过.你能告诉我为什么吗?非常感谢你:

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

我也尝试将一个阻塞调用放在一个单独的线程中.但它仍然没有称之为abar功能.

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)

event-loop flask python-3.x python-asyncio

30
推荐指数
4
解决办法
2万
查看次数

"SSLError:[SSL] PEM库(_ssl.c:2532)"是什么意思使用Python ssl库?

我正在尝试使用Python 3 asyncio模块连接到另一方并获得此错误:

     36     sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
---> 37     sslcontext.load_cert_chain(cert, keyfile=ca_cert)
     38

SSLError: [SSL] PEM lib (_ssl.c:2532)
Run Code Online (Sandbox Code Playgroud)

问题是错误的意思.我的证书是正确的,密钥文件(CA证书)可能不正确.

python ssl ssl-certificate python-3.x python-asyncio

29
推荐指数
3
解决办法
5万
查看次数

如何将coroutine添加到正在运行的asyncio循环?

如何为正在运行的asyncio循环添加新的协同程序?IE浏览器.一个已经执行了一套协同程序的程序.

我想作为一种解决方法,可以等待现有协程完成,然后初始化一个新循环(使用附加协程).但有更好的方法吗?

python asynchronous python-3.x python-asyncio

29
推荐指数
5
解决办法
3万
查看次数

asyncio是否支持文件操作的异步I/O?

asyncio是否支持文件操作的异步I/O?如果是,我如何在Python 3.5中使用async/await语法代码?

python python-3.x python-asyncio python-3.5

29
推荐指数
4
解决办法
2万
查看次数

asyncio.create_task() 有什么作用?

有什么作用asyncio.create_task()?我看过文档,似乎无法理解。一些让我困惑的代码是这样的:

import asyncio

async def counter_loop(x, n):
    for i in range(1, n + 1):
        print(f"Counter {x}: {i}")
        await asyncio.sleep(0.5)
    return f"Finished {x} in {n}"

async def main():
    slow_task = asyncio.create_task(counter_loop("Slow", 4))
    fast_coro = counter_loop("Fast", 2)

    print("Awaiting Fast")
    fast_val = await fast_coro
    print("Finished Fast")

    print("Awaiting Slow")
    slow_val = await slow_task
    print("Finished Slow")

    print(f"{fast_val}, {slow_val}")

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

这给出了以下输出:

001 | Awaiting Fast
002 | Counter Fast: 1
003 | Counter Slow: 1
004 | Counter Fast: 2
005 | Counter Slow: …
Run Code Online (Sandbox Code Playgroud)

python python-3.x async-await python-asyncio

29
推荐指数
1
解决办法
1万
查看次数