标签: python-asyncio

为什么asyncio的run_in_executor阻止龙卷风的get处理程序?

我想在龙卷风的异步GET请求处理程序中运行一个缓慢的阻止方法(实际上是从第三方库中)。将该方法设为:

def blocking_method(uid):
    print("slow method started: ", uid)
    time.sleep(10)
    print("slow method done: ", uid)
    return "slow method ({}) result".format(uid)
Run Code Online (Sandbox Code Playgroud)

此外,我更喜欢在asyncio的事件循环中运行龙卷风服务器:

if __name__ == '__main__':
    tornado.platform.asyncio.AsyncIOMainLoop().install()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(make_app())
    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

我知道@run_in_executor装饰器,但是它不适合我,因为我使用asyncio。要在异步协程中运行阻止方法,应使用的run_in_executor方法asyncio.get_event_loop()。这是一个示例,如何从此答案中做到这一点

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
    future1 = loop.run_in_executor(executor, blocking_method, 1)
    future2 = loop.run_in_executor(executor, blocking_method, 2)
    response1 = await future1
    response2 = await future2
    print(response1)
    print(response2)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

而且效果很好,这是先前脚本的输出:

slow …
Run Code Online (Sandbox Code Playgroud)

python tornado python-asyncio

0
推荐指数
1
解决办法
1521
查看次数

为什么 python asyncio Loop.call_soon 会覆盖数据?

我在代码中创建了一个难以追踪的错误,但不明白为什么会发生。当多次推送相同的异步函数以立即调用时会出现问题。同步函数不会发生这种情况。

这是该问题的运行示例:

import asyncio
import sys

class TestObj(object):

    def __init__(self):

        self.test_data = {'a': 1, 'b': 2, 'c': 3}
        self.loop = asyncio.get_event_loop()
        self.loop.call_later(1, lambda: asyncio.ensure_future(self.calling_func()))
        self.loop.call_later(2, self.calling_func_sync)
        self.loop.call_later(4, sys.exit)
        self.loop.run_forever()

    async def do_something(self, k, v):
        print("Values", k, v)

    async def calling_func(self):
        for k, v in self.test_data.items():
            print("Sending", k, v)
            self.loop.call_soon(lambda: asyncio.ensure_future(self.do_something(k, v)))

    def do_something_sync(self, k, v):
        print("Values_sync", k, v)

    def calling_func_sync(self):
        for k, v in self.test_data.items():
            print("Sending_sync", k, v)
            self.loop.call_soon(self.do_something_sync, k, v)


if __name__ == "__main__":
    a = TestObj()
Run Code Online (Sandbox Code Playgroud)

输出是:

Sending …
Run Code Online (Sandbox Code Playgroud)

python event-loop python-asyncio

0
推荐指数
1
解决办法
1386
查看次数

Go lang中的Python asyncio事件循环等价物

asyncioPython3.x 中使用异步/并发事件循环。

asyncio在 Go lang 中是否有任何等效或协程具有使用线程的并发性能?


[注意]:

不是并行+并发(多处理)模式。


[更新]:

这是一个asyncio在 Python 中使用的异步事件循环,以便更好地理解:

import asyncio
import time

async def async_say(delay, msg):
    await asyncio.sleep(delay)
    print(msg)

async def main():
    task1 = asyncio.ensure_future(async_say(4, 'hello'))
    task2 = asyncio.ensure_future(async_say(6, 'world'))

    print(f"started at {time.strftime('%X')}")
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

出去:

started at 13:19:44
hello
world
finished at 13:19:50
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激。

concurrency asynchronous go python-3.x python-asyncio

0
推荐指数
1
解决办法
2493
查看次数

获取字符串形式的 aiohttp 结果

我正在尝试使用 python 中的异步从网站获取数据。作为示例,我使用了此代码(在“更好的协程示例”下):https ://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/

现在这工作正常,但它将二进制块写入文件,而我不希望它在文件中。我想要直接得到结果数据。但我目前有一个协程对象列表,我无法从中获取数据。

代码:

# -*- coding: utf-8 -*-
import aiohttp
import asyncio
import async_timeout

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()


async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [fetch(session, url) for url in urls]
        await asyncio.gather(*tasks)
        return tasks

# time normal way of retrieval
if __name__ == '__main__':
    urls = [a list of urls..]

    loop = asyncio.get_event_loop()
    details_async = loop.run_until_complete(main(loop, urls))
Run Code Online (Sandbox Code Playgroud)

谢谢

python python-asyncio aiohttp

0
推荐指数
1
解决办法
2185
查看次数

python async wait 无法返回元组

我想创建一个从网站异步下载的功能。我需要将下载结果连接到输入参数,以便我可以在下载后使用结果和参数。

我目前有以下内容:

async def download(session, url, var1, var2):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            return await (response.read(), url, var1, var2)

async def loop_download(loop, urls, var1s, var2s):
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download(session, url, var1, var2) for url, var1, var2 in zip(urls, var1s, var2s)]
        results = await asyncio.gather(*tasks)
        return results

loop = asyncio.get_event_loop()
results = loop.run_until_complete(loop_download(loop, urls, var1s, var2s))
Run Code Online (Sandbox Code Playgroud)

然而这会返回一个错误:

TypeError: object tuple can't be used in 'await' expression
Run Code Online (Sandbox Code Playgroud)

如何将一些输入数据(例如网址)加入到结果中,以便我可以使用它进行进一步分析?

python async-await python-asyncio aiohttp

0
推荐指数
1
解决办法
5171
查看次数

获取 ServerDisconnectedError 异常,Connection.release() 会帮助解决这个问题吗?

我的代码出现了一些问题。我有一个客户端会话,它通过请求与网站进行通信。

问题是,当我长时间运行代码时,我开始收到一些错误,例如ClientResponseError, ServerDisconnectedError, Error 101。所以我正在阅读文档,我看到了这个:

release()
将连接释放回连接器。
底层套接字未关闭,如果连接超时(默认为 30 秒)未过期,则稍后可以重用连接。

但我不明白。有人可以简单地解释一下吗?它会解决我的问题吗?

session = aiohttp.ClientSession(cookie_jar=cookiejar)
while True:
    await session.post('https://anywhere.com', data={'{}': ''})
Run Code Online (Sandbox Code Playgroud)

python python-asyncio aiohttp

0
推荐指数
1
解决办法
3158
查看次数

inotify 文件描述符上的 os.read:读取 32 字节有效但 31 引发异常

我正在编写一个应该使用 inotify 响应文件更改的程序。下面的骨架程序按我的预期工作......

# test.py
import asyncio
import ctypes
import os

IN_CLOSE_WRITE = 0x00000008

async def main(loop):
    libc = ctypes.cdll.LoadLibrary('libc.so.6')
    fd = libc.inotify_init()

    os.mkdir('directory-to-watch')
    wd = libc.inotify_add_watch(fd, 'directory-to-watch'.encode('utf-8'), IN_CLOSE_WRITE)
    loop.add_reader(fd, handle, fd)

    with open(f'directory-to-watch/file', 'wb') as file:
        pass

def handle(fd):
    event_bytes = os.read(fd, 32)
    print(event_bytes)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Run Code Online (Sandbox Code Playgroud)

...因为它输出...

b'\x01\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00file\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Run Code Online (Sandbox Code Playgroud)

但是,如果我将其更改为尝试读取 31 个字节...

b'\x01\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00file\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Run Code Online (Sandbox Code Playgroud)

......然后它引发了一个异常......

Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/t.py", line 19, in handle
    event_bytes = os.read(fd, 31)
OSError: …
Run Code Online (Sandbox Code Playgroud)

linux ctypes inotify python-3.x python-asyncio

0
推荐指数
1
解决办法
133
查看次数

如何在 asyncio create_task 中更新全局变量

我目前有一个未在整个应用程序中设置的全局变量。我有两个文件,其中 file2 从 file1 导入。全局在 file1 中初始化。

这是初始化全局变量并稍后在 file1 中使用它的代码。

import time
import asyncio

#Initialize global
CONNECTION_OPEN = False

async def calculate_idle(t):
    orig_time = t
    global CONNECTION_OPEN
    while True:
        await asyncio.sleep(5)
        print("GLOBAL CONNECTION", CONNECTION_OPEN)
        if CONNECTION_OPEN:
            print("This value is now true")
        else:
             print("Value is still false")
Run Code Online (Sandbox Code Playgroud)

这是将全局设置为 true 的 websocket 代码。它位于文件 2 中。

import os
import asyncio
import websockets
import json
import threading
import time
from random import randrange
from enum import Enum
from lights import calculate_idle,CONNECTION_OPEN 

async def init_connection(message): …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio

0
推荐指数
1
解决办法
2199
查看次数

使用异步 Python 3 的并发 HTTP 和 SQL 请求

第一次尝试asyncio并且aiohttp。我有以下urlsMySQL数据库获取GET请求的代码。获取响应并将其推送到MySQL数据库。

if __name__ == "__main__":
    database_name = 'db_name'
    company_name = 'company_name'

    my_db = Db(database=database_name) # wrapper class for mysql.connector
    urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
    update_id = my_db.get_updateid()
    my_db.get_connection(dictionary=True)

    for url in urls_dict:
        url_id = url['id']
        url = url['url']
        table_name = my_db.make_sql_table_name_by_url(url)
        insert_query = my_db.get_sql_for_insert(table_name)
        r = requests.get(url=url).json() # make the request
        args = [json.dumps(r), update_id, url_id]
        my_db.db_execute_one(insert_query, args, close_conn=False)

    my_db.close_conn()
Run Code Online (Sandbox Code Playgroud)

这工作正常,但要加快速度我该如何运行它asynchronously

我看过这里这里这里,但似乎无法理解它。 …

python python-asyncio aiohttp

0
推荐指数
1
解决办法
3051
查看次数

asyncio.Queue 卡住了 1 个协程添加到队列,1 个协程从队列中获取

在我下面的简单 asyncio 代码中,应用程序有一个任务self.add_item_loop_task不断地向asyncio.Queuenamed 中添加一个整数self.queue,而第二个任务则self.get_item_loop_task不断地等待将某些内容添加到队列中并将print其退出。

但是,这个应用程序0在我运行时只打印一次,然后卡在那里。我相信循环self.get_item_loop_task没有继续。为什么会这样?

import asyncio

class App:
    def __init__(self):
        self.queue = asyncio.Queue()

    async def start(self):
        self.add_item_loop_task = asyncio.create_task(self.add_item_loop())
        self.get_item_loop_task = asyncio.create_task(self.get_item_loop())
        await asyncio.wait(
            [
                self.add_item_loop_task,
                self.get_item_loop_task,
            ]
        )

    async def stop(self):
        self.add_item_loop_task.cancel()
        self.get_item_loop_task.cancel()

    async def add_item_loop(self):
        i = 0
        while True:
            await self.queue.put(i)
            i += 1
            await asyncio.sleep(1)

    async def get_item_loop(self):
        while True:
            item = await self.queue.get()
            print(item)


app = App()
try:
    asyncio.run(app.start())
except …
Run Code Online (Sandbox Code Playgroud)

python queue coroutine python-3.x python-asyncio

0
推荐指数
1
解决办法
60
查看次数