小编And*_*lov的帖子

与工作者"线程"相同的asyncio.Queues

我正在试图弄清楚如何移植一个线程程序来使用asyncio.我有很多代码可以围绕几个标准库同步Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Run Code Online (Sandbox Code Playgroud)

一个线程创建值(可能是用户输入),另一个线程用它们做某事.关键是这些线程在有新数据之前一直处于空闲状态,此时它们会唤醒并对其执行某些操作.

我正在尝试使用asyncio实现这种模式,但我似乎无法弄清楚如何让它"去".

我的尝试看起来或多或少都像这样(并且根本不做任何事情).

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? …
Run Code Online (Sandbox Code Playgroud)

python queue python-3.x python-asyncio

34
推荐指数
2
解决办法
1万
查看次数

如何模拟asyncio协同程序?

以下代码与TypeError: 'Mock' object is not iterablein 失败,ImBeingTested.i_call_other_coroutines因为我已被ImGoingToBeMockedMock对象替换.

如何模仿协同程序?

class ImGoingToBeMocked:
    @asyncio.coroutine
    def yeah_im_not_going_to_run(self):
        yield from asyncio.sleep(1)
        return "sup"

class ImBeingTested:
    def __init__(self, hidude):
        self.hidude = hidude

    @asyncio.coroutine
    def i_call_other_coroutines(self):
        return (yield from self.hidude.yeah_im_not_going_to_run())

class TestImBeingTested(unittest.TestCase):

    def test_i_call_other_coroutines(self):
        mocked = Mock(ImGoingToBeMocked)
        ibt = ImBeingTested(mocked)

        ret = asyncio.get_event_loop().run_until_complete(ibt.i_call_other_coroutines())
Run Code Online (Sandbox Code Playgroud)

python unit-testing mocking python-3.x python-asyncio

20
推荐指数
5
解决办法
9875
查看次数

带有asyncio的多个循环

是否可以使用asyncio进行多个循环?如果回答是肯定的,我该怎么做?我的用例是:*我从异步中的网站列表中提取网址*对于每个"子网址列表",我会在异步/抓取它们

提取网址的示例:

import asyncio
import aiohttp
from suburls import extractsuburls

@asyncio.coroutine
def extracturls(url):
    subtasks = []
    response = yield from aiohttp.request('GET', url)
    suburl_list = yield from response.text()
    for suburl in suburl_list:
        subtasks.append(asyncio.Task(extractsuburls(suburl)))
     loop = asyncio.get_event_loop()
     loop.run_until_complete(asyncio.gather(*subtasks))

 if __name__ == '__main__':
     urls_list = ['http://example1.com', 'http://example2.com']
     for url in url_list: 
          subtasks.append(asyncio.Task(extractsuburls(url)))  
     loop = asyncio.get_event_loop()
     loop.run_until_complete(asyncio.gather(*subtasks))
     loop.close()
Run Code Online (Sandbox Code Playgroud)

如果我执行这段代码,当python尝试启动第二个循环时,我会发现一个错误,就是说循环已经在运行了.

PS:我的模块"extractsuburls"使用aiohttp来执行web请求.

编辑:

好吧,我试过这个解决方案:

import asyncio
import aiohttp
from suburls import extractsuburls

@asyncio.coroutine
def extracturls( url ):
    subtasks = []
    response = yield from aiohttp.request('GET', url) …
Run Code Online (Sandbox Code Playgroud)

python asynchronous python-3.x python-asyncio aiohttp

18
推荐指数
1
解决办法
9269
查看次数

Python asyncio调试示例

我想启用Asyncio的非屈服协程检测,但没有成功.

这个简单的代码实现了以下建议:https:
//docs.python.org/3/library/asyncio-dev.html#asyncio-logger

但实际上并没有捕捉到未屈服的"虚拟"协程.

import sys, os
import asyncio
import logging
import warnings

os.environ['PYTHONASYNCIODEBUG'] = '1'
logging.basicConfig(level=logging.DEBUG)
warnings.resetwarnings()

@asyncio.coroutine
def dummy():
    print('yeah, dummy ran!!')

@asyncio.coroutine
def startdummy():
    print('creating dummy')
    dummy()

if __name__ == '__main__':
    lp = asyncio.get_event_loop()
    lp.run_until_complete(startdummy())
Run Code Online (Sandbox Code Playgroud)

我预计程序会以关于协同程序'dummy'的警告而结束,但是没有产生.

实际上,结果是:

DEBUG:asyncio:Using selector: SelectSelector
creating dummy
sys:1: ResourceWarning: unclosed <socket object at 0x02DCB6F0>
c:\python34\lib\importlib\_bootstrap.py:2150: ImportWarning: sys.meta_path is empty
sys:1: ResourceWarning: unclosed <socket object at 0x02DE10C0>
Run Code Online (Sandbox Code Playgroud)

没有一丝废弃的鲜花.我错过了什么?

python asynchronous python-3.x python-asyncio

17
推荐指数
1
解决办法
5231
查看次数

Asyncio和aiohttp将所有urls路径路由到处理程序

我很难找到匹配所有传入网址的通配符网址匹配模式.这只匹配一个只有主机名的url:

import asyncio
from aiohttp import web

@asyncio.coroutine
def handle(request):
    print('there was a request')
    text = "Hello "
    return web.Response(body=text.encode('utf-8'))

@asyncio.coroutine
def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', handle)

    srv = yield from loop.create_server(app.make_handler(),
                                        '127.0.0.1', 9999)
    print("Server started at http://'127.0.0.1:9999'")
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass 
Run Code Online (Sandbox Code Playgroud)

因此,无论路径如何,它都应该在有请求时随时调用处理程序.如果是http://127.0.0.1:9999/http://127.0.0.1:9999/test/this/test/

我在这里了一下http://aiohttp.readthedocs.org/en/stable/web.html#aiohttp-web-variable-handler没有成功找到合适的线索

python python-3.x python-asyncio aiohttp

17
推荐指数
1
解决办法
5318
查看次数

如何编写可选择作为常规函数的asyncio协同程序?

我正在编写一个我希望最终用户可以选择使用的库,就像它的方法和函数不是协同程序一样.

例如,给定此功能:

@asyncio.coroutine
def blah_getter():
    return (yield from http_client.get('http://blahblahblah'))
Run Code Online (Sandbox Code Playgroud)

不关心在自己的代码中使用任何异步功能的最终用户仍然需要导入asyncio并运行:

>>> response = asyncio.get_event_loop().run_until_complete(blah_getter())
Run Code Online (Sandbox Code Playgroud)

如果可以的话,这将是很酷的,blah_getter确定我是否被称为协程,并做出相应的反应.

所以类似于:

@asyncio.coroutine
def blah_getter():
    if magically_determine_if_being_yielded_from():
        return (yield from http_client.get('http://blahblahblah'))
    else:
        el = asyncio.get_event_loop()
        return el.run_until_complete(http_client.get('http://blahblahblah'))
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio

16
推荐指数
2
解决办法
6006
查看次数

在Python 3.5中使用aiohttp获取多个URL

因为Python 3.5引入了async with在推荐的语法文档aiohttp改变.现在要获得一个网址,他们建议:

import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(loop=loop) as session:
        html = loop.run_until_complete(
            fetch(session, 'http://python.org'))
        print(html)
Run Code Online (Sandbox Code Playgroud)

如何修改此设置以获取网址集合而不仅仅是一个网址?

在旧asyncio示例中,您将设置一个任务列表,例如

    tasks = [
            fetch(session, 'http://cnn.com'),
            fetch(session, 'http://google.com'),
            fetch(session, 'http://twitter.com')
            ]
Run Code Online (Sandbox Code Playgroud)

我试图将这样的列表与上面的方法结合起来但是失败了.

python web-scraping python-3.x python-asyncio aiohttp

11
推荐指数
1
解决办法
5289
查看次数

等待任何未来的asyncio

我正在尝试使用asyncio来处理并发网络I/O. 在一个点上安排了大量的功能,这些功能在每个功能完成所需的时间上变化很大.然后,对于每个输出,在单独的过程中处理接收的数据.

处理数据的顺序是不相关的,因此,考虑到输出的潜在等待时间非常长,我希望await无论将来如何完成而不是预定义的顺序.

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    for f in futures:
       await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

通常,等待期货排队的顺序很好:

表现良好的函数分析器图

蓝色表示每个任务在执行程序队列中的时间,run_in_executor即已被调用,但该函数尚未执行,因为执行程序仅同时运行5个任务; 绿色是执行功能本身所花费的时间; 红色是等待所有先前期货所花费的时间await.

易失性函数分析器图

在我的情况下,函数的时间变化很大,等待队列中的先前期货等待时会有很多时间丢失,而我可以在本地处理GET输出.这使得我的系统空闲一段时间只是在几个输出同时完成时被淹没,然后跳回空闲等待更多请求完成.

有没有办法await在执行者中首先完成任何未来?

python executor python-3.x python-asyncio

8
推荐指数
1
解决办法
4896
查看次数

异常"在新循环上运行时,线程'MainThread'中没有当前事件循环

这是简单的测试代码和结果.

import asyncio

async def test():
    await asyncio.sleep(1)

if __name__ == '__main__':

    asyncio.set_event_loop(None)      # Clear the main loop.
    loop = asyncio.new_event_loop()   # Create a new loop.
    loop.run_until_complete(test())   # Run coroutine over the new loop
Run Code Online (Sandbox Code Playgroud)
Traceback (most recent call last):
  File "test_test.py", line 11, in <module>
    loop.run_until_complete(test())
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "test_test.py", line 5, in test
    await asyncio.sleep(1)
  File …
Run Code Online (Sandbox Code Playgroud)

python event-loop python-3.x python-asyncio python-3.5

7
推荐指数
1
解决办法
2437
查看次数

Python - 尝试使用意外的mimetype解码JSON:

我最近从请求交换到aiohttp,因为我无法在asyncio循环中使用它.

交换完美,一切顺利,除了一件事.我的控制台充满了

Attempt to decode JSON with unexpected mimetype:
Run Code Online (Sandbox Code Playgroud)

Attempt to decode JSON with unexpected mimetype: txt/html; charset=utf-8
Run Code Online (Sandbox Code Playgroud)

我的代码也有一个网站列表,它也可以抓取JSON,每个网站都不同,但我的循环基本上是相同的,我在这里简化了它:

PoolName = "http://website.com"
endpoint = "/api/stats"
headers = "headers = {'content-type': 'text/html'}" #Ive tried "application/json" and no headers
async with aiohttp.get(url=PoolName+endpoint, headers=headers) as hashrate:
                hashrate = await hashrate.json()
endVariable = hashrate['GLRC']['HASH']
Run Code Online (Sandbox Code Playgroud)

它工作得很好,连接到站点抓取json并正确设置endVariable.但出于某种原因

Attempt to decode JSON with unexpected mimetype:
Run Code Online (Sandbox Code Playgroud)

每次进入循环时打印.这很烦人,因为它会向控制台打印统计信息,并且每次抓取站点json时它们都会在错误中丢失

有没有办法解决这个错误或隐藏它?

python json python-3.x python-asyncio aiohttp

7
推荐指数
2
解决办法
4987
查看次数