标签: python-asyncio

@ asyncio.coroutine vs async def

有了asyncio我见过的图书馆,

@asyncio.coroutine
def function():
    ...
Run Code Online (Sandbox Code Playgroud)

async def function():
    ...
Run Code Online (Sandbox Code Playgroud)

可互换使用.

这两者之间有什么功能差异吗?

python python-3.x async-await python-asyncio python-3.5

39
推荐指数
2
解决办法
9889
查看次数

Python中的异步异常处理

我有以下代码使用asyncioaiohttp发出异步HTTP请求.

import sys
import asyncio
import aiohttp

@asyncio.coroutine
def get(url):
    try:
        print('GET %s' % url)
        resp = yield from aiohttp.request('GET', url)
    except Exception as e:
        raise Exception("%s has error '%s'" % (url, e))
    else:
        if resp.status >= 400:
            raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))

    return (yield from resp.text())

@asyncio.coroutine
def fill_data(run):
    url = 'http://www.google.com/%s' % run['name']
    run['data'] = yield from get(url)

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio

38
推荐指数
2
解决办法
3万
查看次数

如何使用 pytest 测试异步函数?

@pytest.fixture
def d_service():
    c = DService()
    return c

# @pytest.mark.asyncio  # tried it too
async def test_get_file_list(d_service):
    files = await d_service.get_file_list('')
    print(files)
Run Code Online (Sandbox Code Playgroud)

然而,却出现了以下错误?

已收集 0 项 / 1 错误

===================================== 错误============== =====================
________________ 收集测试/e2e_tests/test_d.py 时出错 _________________
..\..\..\..\..\anaconda3\lib\site-packages\pluggy\__init__.py:617: 在 __call__ 中
    返回 self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
..\..\..\..\..\anaconda3\lib\site-packages\pluggy\__init__.py:222: 在 _hookexec 中
    返回 self._inner_hookexec(钩子、方法、kwargs)
..\..\..\..\..\anaconda3\lib\site-packages\pluggy\__init__.py:216:在
    第一个结果=hook.spec_opts.get('第一个结果'),
..\..\..\..\..\anaconda3\lib\site-packages\_pytest\python.py:171:在 pytest_pycollect_makeitem 中
    res = 结果.get_result()
..\..\..\..\..\anaconda3\lib\site-packages\anyio\pytest_plugin.py:98:在 pytest_pycollect_makeitem 中
    标记=collector.get_closest_marker('anyio')
E AttributeError:“模块”对象没有属性“get_closest_marker”
!!!!!!!!!!!!!!!!!!!!! 已中断:收集期间出现 1 个错误!!!!!!!!!!!!!!!!!!
=========================== 2.53 秒内出现 1 个错误 ================== =========

我安装了以下软件包。错误消失,但测试被跳过。

pip install …
Run Code Online (Sandbox Code Playgroud)

python pytest python-asyncio pytest-asyncio

36
推荐指数
3
解决办法
4万
查看次数

与工作者"线程"相同的asyncio.Queues

我正在试图弄清楚如何移植一个线程程序来使用asyncio.我有很多代码可以围绕几个标准库同步Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Run Code Online (Sandbox Code Playgroud)

一个线程创建值(可能是用户输入),另一个线程用它们做某事.关键是这些线程在有新数据之前一直处于空闲状态,此时它们会唤醒并对其执行某些操作.

我正在尝试使用asyncio实现这种模式,但我似乎无法弄清楚如何让它"去".

我的尝试看起来或多或少都像这样(并且根本不做任何事情).

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? …
Run Code Online (Sandbox Code Playgroud)

python queue python-3.x python-asyncio

34
推荐指数
2
解决办法
1万
查看次数

如何在 Python 中使用`async for`?

我的意思是我从使用async for. 这是我用 编写的代码async forAIter(10)可以替换为get_range().

但是代码运行起来像同步而不是异步。

import asyncio

async def get_range():
    for i in range(10):
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        yield i

class AIter:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        if i >= self.N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def main():
    async for p in AIter(10):
        print(f"finally {p}")

if …
Run Code Online (Sandbox Code Playgroud)

python asynchronous python-asyncio

34
推荐指数
2
解决办法
2万
查看次数

asyncio.get_event_loop(): DeprecationWarning: 没有当前事件循环

我正在构建一个 SMTP 服务器,并aiosmtpd使用这些示例作为构建的基础。下面是程序入口点的代码片段。

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(amain(loop=loop))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
Run Code Online (Sandbox Code Playgroud)

当我运行该程序时,我收到以下警告:

server.py:61: DeprecationWarning: There is no current event loop
  loop = asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)

实现这个的正确方法是什么?

python python-asyncio aiosmtpd

34
推荐指数
1
解决办法
4万
查看次数

如何将python asyncio与线程结合起来?

我已经使用Python asyncio和aiohttp 成功构建了一个RESTful微服务,它监听POST事件以从各种馈送器收集实时事件.

然后,它构建一个内存中结构,以便在嵌套的defaultdict/deque结构中缓存最后24h的事件.

现在我想定期检查该结构到光盘,最好使用泡菜.

由于内存结构可能> 100MB,我想避免在检查结构所需的时间内阻止我的传入事件处理.

我宁愿创建结构的快照拷贝(例如深度拷贝),然后花时间将其写入磁盘并在预设的时间间隔内重复.

我一直在寻找关于如何组合线程的例子(并且是一个线程甚至是最好的解决方案吗?)和asyncio用于那个目的但找不到能帮助我的东西.

任何开始使用的指针都非常感谢!

python multithreading python-asyncio aiohttp

33
推荐指数
3
解决办法
2万
查看次数

在python 3.5中模拟异步调用

如何模拟从一个本地协程到另一个使用的异步调用unittest.mock.patch

我目前有一个很尴尬的解决方案:

class CoroutineMock(MagicMock):
    def __await__(self, *args, **kwargs):
        future = Future()
        future.set_result(self)
        result = yield from future
        return result
Run Code Online (Sandbox Code Playgroud)

然后

class TestCoroutines(TestCase):
    @patch('some.path', new_callable=CoroutineMock)
    def test(self, mock):
        some_action()
        mock.assert_called_with(1,2,3)
Run Code Online (Sandbox Code Playgroud)

这有效,但看起来很难看.是否有更多的pythonic方式来做到这一点?

python python-mock python-asyncio

32
推荐指数
7
解决办法
2万
查看次数

如何调用类中包含的异步函数?

根据这个答案,我想在一个类中构建一个异步websoket客户端,该类将从另一个文件导入:

#!/usr/bin/env python3

import sys, json
import asyncio
from websockets import connect

class EchoWebsocket:
    def __await__(self):
        # see: https://stackoverflow.com/a/33420721/1113207
        return self._async_init().__await__()

    async def _async_init(self):
        self._conn = connect('wss://ws.binaryws.com/websockets/v3')
        self.websocket = await self._conn.__aenter__()
        return self

    async def close(self):
        await self._conn.__aexit__(*sys.exc_info())

    async def send(self, message):
        await self.websocket.send(message)

    async def receive(self):
        return await self.websocket.recv()

class mtest:
    async def start(self):
        try:
            self.wws = await EchoWebsocket()
        finally:
            await self.wws.close()

    async def get_ticks(self):
        await self.wws.send(json.dumps({'ticks_history': 'R_50', 'end': 'latest', 'count': 1}))
        return await self.wws.receive()

if __name__ …
Run Code Online (Sandbox Code Playgroud)

python websocket python-asyncio

32
推荐指数
2
解决办法
2万
查看次数

"yield from"语法在asyncio中的作用是什么?它与"await"有什么不同?

从谁写ASYNCIO代码,但正在寻求更好地理解内部工作的人的角度来看,是什么yield from,await以及如何允许异步代码这些有用吗?

有一个高度赞成的问题询问yield from语法的用法和解释异步和等待的问题,但两者都深入讨论了不同的主题,并不是对底层代码及其如何适应asyncio的简明解释.

python generator coroutine async-await python-asyncio

32
推荐指数
1
解决办法
9130
查看次数