标签: python-asyncio

self = None是做什么的?

我正在阅读传入asyncio包的源代码.请注意,在方法的最后,有一个self = None声明.它有什么作用?

def _run(self):
    try:
        self._callback(*self._args)
    except Exception as exc:
        msg = 'Exception in callback {}{!r}'.format(self._callback,
                                                    self._args)
        self._loop.call_exception_handler({
            'message': msg,
            'exception': exc,
            'handle': self,
        })
    self = None  # Needed to break cycles when an exception occurs.
Run Code Online (Sandbox Code Playgroud)

我认为它会删除实例,但以下测试不建议如此:

class K:
    def haha(self):
        self = None

a = K()
a.haha()
print(a) # a is still an instance
Run Code Online (Sandbox Code Playgroud)

python garbage-collection python-internals python-asyncio

24
推荐指数
1
解决办法
4736
查看次数

Python - 如何使用asyncio同时运行多个协同程序?

我正在使用该websockets库在Python 3.4中创建一个websocket服务器.这是一个简单的echo服务器:

import asyncio
import websockets

@asyncio.coroutine
def connection_handler(websocket, path):
    while True:
        msg = yield from websocket.recv()
        if msg is None:  # connection lost
            break
        yield from websocket.send(msg)

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)

假设我们 - 另外 - 想要在某些事件发生时向客户端发送消息.为简单起见,让我们每隔60秒定期发送一条消息.我们怎么做?我的意思是,因为connection_handler经常等待收到的消息,服务器只能收到客户端的消息后才采取行动,对吗?我在这里错过了什么?

也许这种情况需要一个基于事件/回调的框架,而不是一个基于协同程序的框架?龙卷风

python websocket python-3.x python-asyncio

24
推荐指数
3
解决办法
2万
查看次数

如何在ProcessPool中处理SQLAlchemy连接?

我有一个反应器从RabbitMQ代理获取消息并触发工作方法在进程池中处理这些消息,如下所示:

反应堆

这是使用python实现的asyncio,loop.run_in_executor()concurrent.futures.ProcessPoolExecutor.

现在我想使用SQLAlchemy访问worker方法中的数据库.大多数情况下,处理将是非常简单和快速的CRUD操作.

反应器在开始时每秒处理10-50条消息,因此不能为每个请求打开新的数据库连接.相反,我想在每个进程中维护一个持久连接.

我的问题是:我怎么能这样做?我可以将它们存储在全局变量中吗?SQA连接池是否会为我处理这个问题?当反应堆停止时如何清理?

[更新]

  • 数据库是带有InnoDB的MySQL.

为什么选择带有进程池的模式?

当前实现使用不同的模式,其中每个使用者在其自己的线程中运行.不知何故,这不是很好.已经有大约200个消费者在他们自己的线程中运行,并且系统正在快速增长.为了更好地扩展,我们的想法是分离关注点并在I/O循环中使用消息并将处理委托给池.当然,整个系统的性能主要是I/O绑定.但是,处理大型结果集时CPU是一个问题.

另一个原因是"易用性".虽然消息的连接处理和消耗是异步实现的,但是worker中的代码可以是同步且简单的.

很快,很明显,通过工作者内部的持久网络连接访问远程系统是一个问题.这就是CommunicationChannels的用途:在worker中,我可以通过这些通道向消息总线发出请求.

我目前的一个想法是以类似的方式处理数据库访问:将语句通过队列传递到事件循环,然后将它们发送到数据库.但是,我不知道如何使用SQLAlchemy执行此操作.入口点在哪里?对象需要pickled在它们通过队列时传递.如何从SQA查询中获取此类对象?与数据库的通信必须异步工作,以免阻塞事件循环.我可以使用例如aiomysql作为SQA的数据库驱动程序吗?

python sqlalchemy rabbitmq python-asyncio python-multiprocessing

24
推荐指数
1
解决办法
2108
查看次数

RuntimeError:async + apscheduler中的线程中没有当前事件循环

我有一个异步功能,需要每隔N分钟使用apscheduller运行一次.下面有一个python代码

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apscheduler python-asyncio aiohttp

24
推荐指数
5
解决办法
3万
查看次数

来自协程的产量与来自任务的产量

Guido van Rossum在2014年关于Tulip/Asyncio的演讲中展示了幻灯片:

任务与协同程序

  • 相比:

    • res =来自some_coroutine的收益率(...)
    • res =来自任务的产量(some_coroutine(...))
  • 任务可以在不等待的情况下取得进展

    • 作为日志,你等待别的东西
      • 即产量

我完全忽略了这一点.

从我的观点来看,两个结构都是相同的:

如果是裸协程 - 它会被调度,所以无论如何都会创建任务,因为调度程序与Tasks一起运行,然后协程调用程序协同程序被暂停,直到被调用者完成,然后可以自由继续执行.

如果Task- 完全相同 - 新任务被调度,调用者协程等待完成.

在这两种情况下执行代码的方式与开发人员在实践中应考虑的影响有何不同?

ps
非常感谢与权威来源(GvR,PEP,docs,核心开发者笔记)的链接.

python concurrency asynchronous python-3.x python-asyncio

23
推荐指数
2
解决办法
5427
查看次数

为什么大多数asyncio示例都使用loop.run_until_complete()?

我正在阅读Python文档asyncio,我想知道为什么大多数示例使用loop.run_until_complete()而不是Asyncio.ensure_future().

例如:https://docs.python.org/dev/library/asyncio-task.html

似乎ensure_future是一种更好的方式来展示非阻塞函数的优点.run_until_complete另一方面,像同步函数一样阻塞循环.

这让我觉得我应该使用run_until_complete而不是组合ensure_future使用loop.run_forever()来同时运行多个协同例程.

python-3.x python-asyncio

23
推荐指数
2
解决办法
2万
查看次数

从流中产生的正确方法是什么?

我有一个Connection用于包含读取和写入asyncio连接流的对象:

class Connection(object):

    def __init__(self, stream_in, stream_out):
        object.__init__(self)

        self.__in = stream_in
        self.__out = stream_out

    def read(self, n_bytes : int = -1):
        return self.__in.read(n_bytes)

    def write(self, bytes_ : bytes):
        self.__out.write(bytes_)
        yield from self.__out.drain()
Run Code Online (Sandbox Code Playgroud)

在服务器端,每次客户端连接时connected创建一个Connection对象,然后读取4个字节.

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)
Run Code Online (Sandbox Code Playgroud)

在客户端,写出4个字节.

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')
Run Code Online (Sandbox Code Playgroud)

这个作品几乎为预期的,但我必须yield from每天readwrite电话.我yield from从里面尝试过Connection: …

python generator python-3.x python-asyncio yield-from

23
推荐指数
1
解决办法
1322
查看次数

如何在 pdb 中等待协程

我正在使用一个异步库 ( asyncpg ),我想调试一些异步调用来查询数据库。

我放置了一个 pdb 断点并想尝试一些查询:

(pdb) await asyncpg.fetch("select * from foo;")
*** SyntaxError: 'await' outside function
Run Code Online (Sandbox Code Playgroud)

能够这样做会很棒,因为它允许我尝试一些 SQL 查询并查看结果,所有这些都来自我的调试器。

是否可以?

python pdb python-asyncio python-3.7

23
推荐指数
1
解决办法
1881
查看次数

pytest-asyncio 有一个封闭的事件循环,但仅在运行所有测试时

我有一个测试来验证异步响应是否抛出异常,我正在使用 pytest-asyncio 版本 0.10.0 来运行该测试。

代码基本上是:

class TestThis:
    @pytest.mark.asyncio
    def test_the_thing(self):
       arg1 = "cmd"
       arg2 = "second command"
       with pytest.raises(CustomException):
           await do_thing(arg1, arg2)
Run Code Online (Sandbox Code Playgroud)

现在真正奇怪的是,如果我单独运行这个测试,或者单独运行这个类,这个测试就可以正常工作。但是,当我运行所有测试(项目根目录下的 pytest)时,每次都会失败并出现运行时错误,表示循环已关闭。

pytest python-asyncio pytest-asyncio

23
推荐指数
2
解决办法
2万
查看次数

如何在Jupyter笔记本中运行Python asyncio代码?

我有一些asyncio代码在Python解释器(CPython 3.6.2)中运行良好.我现在想在一个带有IPython内核的Jupyter笔记本中运行它.

我可以用它来运行它

import asyncio
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)

虽然这似乎工作,它似乎也阻止了笔记本电脑,似乎并没有与笔记本电脑很好玩.

我的理解是Jupyter使用了Tornado,所以我尝试按照Tornado文档中的建议安装Tornado事件循环:

from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()
Run Code Online (Sandbox Code Playgroud)

但是,这会产生以下错误:

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-1-1139449343fc> in <module>()
      1 from tornado.platform.asyncio import AsyncIOMainLoop
----> 2 AsyncIOMainLoop().install()

~\AppData\Local\Continuum\Anaconda3\envs\numismatic\lib\site- packages\tornado\ioloop.py in install(self)
    179         `IOLoop` (e.g.,     :class:`tornado.httpclient.AsyncHTTPClient`).
    180         """
--> 181         assert not IOLoop.initialized()
    182         IOLoop._instance = self
    183 

AssertionError: 
Run Code Online (Sandbox Code Playgroud)

最后我找到了以下页面:http://ipywidgets.readthedocs.io/en/stable/examples/Widget%20Asynchronous.html

所以我添加了一个包含以下代码的单元格:

import asyncio
from ipykernel.eventloops import register_integration

@register_integration('asyncio')
def loop_asyncio(kernel):
    '''Start a kernel with asyncio event loop support.'''
    loop = asyncio.get_event_loop() …
Run Code Online (Sandbox Code Playgroud)

python ipython-notebook python-asyncio jupyter

22
推荐指数
4
解决办法
7161
查看次数