有了asyncio我见过的图书馆,
@asyncio.coroutine
def function():
...
Run Code Online (Sandbox Code Playgroud)
和
async def function():
...
Run Code Online (Sandbox Code Playgroud)
可互换使用.
这两者之间有什么功能差异吗?
我有以下代码使用asyncio和aiohttp发出异步HTTP请求.
import sys
import asyncio
import aiohttp
@asyncio.coroutine
def get(url):
try:
print('GET %s' % url)
resp = yield from aiohttp.request('GET', url)
except Exception as e:
raise Exception("%s has error '%s'" % (url, e))
else:
if resp.status >= 400:
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
return (yield from resp.text())
@asyncio.coroutine
def fill_data(run):
url = 'http://www.google.com/%s' % run['name']
run['data'] = yield from get(url)
def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop …Run Code Online (Sandbox Code Playgroud) @pytest.fixture
def d_service():
c = DService()
return c
# @pytest.mark.asyncio # tried it too
async def test_get_file_list(d_service):
files = await d_service.get_file_list('')
print(files)
Run Code Online (Sandbox Code Playgroud)
然而,却出现了以下错误?
已收集 0 项 / 1 错误
===================================== 错误============== =====================
________________ 收集测试/e2e_tests/test_d.py 时出错 _________________
..\..\..\..\..\anaconda3\lib\site-packages\pluggy\__init__.py:617: 在 __call__ 中
返回 self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
..\..\..\..\..\anaconda3\lib\site-packages\pluggy\__init__.py:222: 在 _hookexec 中
返回 self._inner_hookexec(钩子、方法、kwargs)
..\..\..\..\..\anaconda3\lib\site-packages\pluggy\__init__.py:216:在
第一个结果=hook.spec_opts.get('第一个结果'),
..\..\..\..\..\anaconda3\lib\site-packages\_pytest\python.py:171:在 pytest_pycollect_makeitem 中
res = 结果.get_result()
..\..\..\..\..\anaconda3\lib\site-packages\anyio\pytest_plugin.py:98:在 pytest_pycollect_makeitem 中
标记=collector.get_closest_marker('anyio')
E AttributeError:“模块”对象没有属性“get_closest_marker”
!!!!!!!!!!!!!!!!!!!!! 已中断:收集期间出现 1 个错误!!!!!!!!!!!!!!!!!!
=========================== 2.53 秒内出现 1 个错误 ================== =========
我安装了以下软件包。错误消失,但测试被跳过。
pip install …Run Code Online (Sandbox Code Playgroud) 我正在试图弄清楚如何移植一个线程程序来使用asyncio.我有很多代码可以围绕几个标准库同步Queues,基本上是这样的:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Run Code Online (Sandbox Code Playgroud)
一个线程创建值(可能是用户输入),另一个线程用它们做某事.关键是这些线程在有新数据之前一直处于空闲状态,此时它们会唤醒并对其执行某些操作.
我正在尝试使用asyncio实现这种模式,但我似乎无法弄清楚如何让它"去".
我的尝试看起来或多或少都像这样(并且根本不做任何事情).
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()? …Run Code Online (Sandbox Code Playgroud) 我的意思是我从使用async for. 这是我用 编写的代码async for,AIter(10)可以替换为get_range().
但是代码运行起来像同步而不是异步。
import asyncio
async def get_range():
for i in range(10):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
yield i
class AIter:
def __init__(self, N):
self.i = 0
self.N = N
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
if i >= self.N:
raise StopAsyncIteration
self.i += 1
return i
async def main():
async for p in AIter(10):
print(f"finally {p}")
if …Run Code Online (Sandbox Code Playgroud) 我正在构建一个 SMTP 服务器,并aiosmtpd使用这些示例作为构建的基础。下面是程序入口点的代码片段。
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(amain(loop=loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
Run Code Online (Sandbox Code Playgroud)
当我运行该程序时,我收到以下警告:
server.py:61: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)
实现这个的正确方法是什么?
我已经使用Python asyncio和aiohttp 成功构建了一个RESTful微服务,它监听POST事件以从各种馈送器收集实时事件.
然后,它构建一个内存中结构,以便在嵌套的defaultdict/deque结构中缓存最后24h的事件.
现在我想定期检查该结构到光盘,最好使用泡菜.
由于内存结构可能> 100MB,我想避免在检查结构所需的时间内阻止我的传入事件处理.
我宁愿创建结构的快照拷贝(例如深度拷贝),然后花时间将其写入磁盘并在预设的时间间隔内重复.
我一直在寻找关于如何组合线程的例子(并且是一个线程甚至是最好的解决方案吗?)和asyncio用于那个目的但找不到能帮助我的东西.
任何开始使用的指针都非常感谢!
如何模拟从一个本地协程到另一个使用的异步调用unittest.mock.patch?
我目前有一个很尴尬的解决方案:
class CoroutineMock(MagicMock):
def __await__(self, *args, **kwargs):
future = Future()
future.set_result(self)
result = yield from future
return result
Run Code Online (Sandbox Code Playgroud)
然后
class TestCoroutines(TestCase):
@patch('some.path', new_callable=CoroutineMock)
def test(self, mock):
some_action()
mock.assert_called_with(1,2,3)
Run Code Online (Sandbox Code Playgroud)
这有效,但看起来很难看.是否有更多的pythonic方式来做到这一点?
根据这个答案,我想在一个类中构建一个异步websoket客户端,该类将从另一个文件导入:
#!/usr/bin/env python3
import sys, json
import asyncio
from websockets import connect
class EchoWebsocket:
def __await__(self):
# see: https://stackoverflow.com/a/33420721/1113207
return self._async_init().__await__()
async def _async_init(self):
self._conn = connect('wss://ws.binaryws.com/websockets/v3')
self.websocket = await self._conn.__aenter__()
return self
async def close(self):
await self._conn.__aexit__(*sys.exc_info())
async def send(self, message):
await self.websocket.send(message)
async def receive(self):
return await self.websocket.recv()
class mtest:
async def start(self):
try:
self.wws = await EchoWebsocket()
finally:
await self.wws.close()
async def get_ticks(self):
await self.wws.send(json.dumps({'ticks_history': 'R_50', 'end': 'latest', 'count': 1}))
return await self.wws.receive()
if __name__ …Run Code Online (Sandbox Code Playgroud) 从谁写ASYNCIO代码,但正在寻求更好地理解内部工作的人的角度来看,是什么yield from,await以及如何允许异步代码这些有用吗?
有一个高度赞成的问题询问yield from语法的用法和解释异步和等待的问题,但两者都深入讨论了不同的主题,并不是对底层代码及其如何适应asyncio的简明解释.
python ×10
python-asyncio ×10
python-3.x ×3
async-await ×2
aiohttp ×1
aiosmtpd ×1
asynchronous ×1
coroutine ×1
generator ×1
pytest ×1
python-3.5 ×1
python-mock ×1
queue ×1
websocket ×1