如何模拟从一个本地协程到另一个使用的异步调用unittest.mock.patch?
我目前有一个很尴尬的解决方案:
class CoroutineMock(MagicMock):
def __await__(self, *args, **kwargs):
future = Future()
future.set_result(self)
result = yield from future
return result
Run Code Online (Sandbox Code Playgroud)
然后
class TestCoroutines(TestCase):
@patch('some.path', new_callable=CoroutineMock)
def test(self, mock):
some_action()
mock.assert_called_with(1,2,3)
Run Code Online (Sandbox Code Playgroud)
这有效,但看起来很难看.是否有更多的pythonic方式来做到这一点?
尝试运行docs中给出的asyncio hello world代码示例时:
import asyncio
async def hello_world():
print("Hello World!")
loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()
Run Code Online (Sandbox Code Playgroud)
我收到错误:
RuntimeError: Event loop is closed
Run Code Online (Sandbox Code Playgroud)
我使用的是python 3.5.3.
我对某些asyncio功能感到有些困惑.我看到有BaseEventLoop.create_task(coro)安排合作例程的功能.文档create_task说它是一个新函数和兼容性,我们应该asyncio.async(coro)通过再次引用docs来看到它是一个别名,asyncio.ensure_future(coro)它再次调度一个协同例程的执行.
与此同时,我一直在Task(coro)用于调度协同例程执行,而且似乎也工作得很好.那么,这些之间的区别是什么?
在 SqlAlchemy 异步 orm 引擎中,如何查询表并获取一个值或全部值?
我从非异步方法知道我可以这样做
SESSION.query(TableClass).get(x)
但尝试使用异步方法会引发下一个错误:
AttributeError: 'AsyncSession' object has no attribute 'query'.
这是我定义的 SESSION 变量。LOOP 变量仅asyncio.get_event_loop()用于在加载 sql 模块时启动异步方法并填充用作缓存的变量,以避免每次需要时都缓存数据库:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from .. import CONFIG, LOOP
def _build_async_db_uri(uri):
if "+asyncpg" not in uri:
return '+asyncpg:'.join(uri.split(":", 1))
return uri
async def start() -> declarative_base:
engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async with engine.begin() as conn:
BASE.metadata.bind = engine
await conn.run_sync(BASE.metadata.create_all)
return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))
BASE = declarative_base()
SESSION …Run Code Online (Sandbox Code Playgroud) 我正在使用aiohttp构建一个API服务器,将TCP请求发送到单独的服务器.发送TCP请求的模块是同步的,并且是出于我的目的的黑盒子.所以我的问题是这些请求阻止了整个API.我需要一种方法将模块请求包装在异步协程中,该协程不会阻止API的其余部分.
所以,仅仅使用sleep一个简单的例子,有没有办法以某种方式将耗时的同步代码包装在非阻塞协程中,如下所示:
async def sleep_async(delay):
# After calling sleep, loop should be released until sleep is done
yield sleep(delay)
return 'I slept asynchronously'
Run Code Online (Sandbox Code Playgroud) 我想在每次执行flask路径时执行异步函数.目前我的abar函数从未执行过.你能告诉我为什么吗?非常感谢你:
import asyncio
from flask import Flask
async def abar(a):
print(a)
loop = asyncio.get_event_loop()
app = Flask(__name__)
@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=loop)
return "OK"
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我也尝试将一个阻塞调用放在一个单独的线程中.但它仍然没有称之为abar功能.
import asyncio
from threading import Thread
from flask import Flask
async def abar(a):
print(a)
app = Flask(__name__)
def start_worker(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))
@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=worker_loop)
return "OK"
if __name__ == "__main__":
worker.start()
app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Python 3 asyncio模块连接到另一方并获得此错误:
36 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
---> 37 sslcontext.load_cert_chain(cert, keyfile=ca_cert)
38
SSLError: [SSL] PEM lib (_ssl.c:2532)
Run Code Online (Sandbox Code Playgroud)
问题是错误的意思.我的证书是正确的,密钥文件(CA证书)可能不正确.
如何为正在运行的asyncio循环添加新的协同程序?IE浏览器.一个已经执行了一套协同程序的程序.
我想作为一种解决方法,可以等待现有协程完成,然后初始化一个新循环(使用附加协程).但有更好的方法吗?
asyncio是否支持文件操作的异步I/O?如果是,我如何在Python 3.5中使用async/await语法代码?
有什么作用asyncio.create_task()?我看过文档,似乎无法理解。一些让我困惑的代码是这样的:
import asyncio
async def counter_loop(x, n):
for i in range(1, n + 1):
print(f"Counter {x}: {i}")
await asyncio.sleep(0.5)
return f"Finished {x} in {n}"
async def main():
slow_task = asyncio.create_task(counter_loop("Slow", 4))
fast_coro = counter_loop("Fast", 2)
print("Awaiting Fast")
fast_val = await fast_coro
print("Finished Fast")
print("Awaiting Slow")
slow_val = await slow_task
print("Finished Slow")
print(f"{fast_val}, {slow_val}")
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
这给出了以下输出:
001 | Awaiting Fast
002 | Counter Fast: 1
003 | Counter Slow: 1
004 | Counter Fast: 2
005 | Counter Slow: …Run Code Online (Sandbox Code Playgroud) python-asyncio ×10
python ×8
python-3.x ×8
asynchronous ×2
aiohttp ×1
async-await ×1
coroutine ×1
event-loop ×1
flask ×1
python-3.5 ×1
python-mock ×1
sqlalchemy ×1
ssl ×1