我想在龙卷风的异步GET请求处理程序中运行一个缓慢的阻止方法(实际上是从第三方库中)。将该方法设为:
def blocking_method(uid):
print("slow method started: ", uid)
time.sleep(10)
print("slow method done: ", uid)
return "slow method ({}) result".format(uid)
Run Code Online (Sandbox Code Playgroud)
此外,我更喜欢在asyncio的事件循环中运行龙卷风服务器:
if __name__ == '__main__':
tornado.platform.asyncio.AsyncIOMainLoop().install()
loop = asyncio.get_event_loop()
loop.run_until_complete(make_app())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我知道@run_in_executor装饰器,但是它不适合我,因为我使用asyncio。要在异步协程中运行阻止方法,应使用的run_in_executor方法asyncio.get_event_loop()。这是一个示例,如何从此答案中做到这一点:
import asyncio
async def main():
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
future1 = loop.run_in_executor(executor, blocking_method, 1)
future2 = loop.run_in_executor(executor, blocking_method, 2)
response1 = await future1
response2 = await future2
print(response1)
print(response2)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
而且效果很好,这是先前脚本的输出:
slow …Run Code Online (Sandbox Code Playgroud) 我在代码中创建了一个难以追踪的错误,但不明白为什么会发生。当多次推送相同的异步函数以立即调用时会出现问题。同步函数不会发生这种情况。
这是该问题的运行示例:
import asyncio
import sys
class TestObj(object):
def __init__(self):
self.test_data = {'a': 1, 'b': 2, 'c': 3}
self.loop = asyncio.get_event_loop()
self.loop.call_later(1, lambda: asyncio.ensure_future(self.calling_func()))
self.loop.call_later(2, self.calling_func_sync)
self.loop.call_later(4, sys.exit)
self.loop.run_forever()
async def do_something(self, k, v):
print("Values", k, v)
async def calling_func(self):
for k, v in self.test_data.items():
print("Sending", k, v)
self.loop.call_soon(lambda: asyncio.ensure_future(self.do_something(k, v)))
def do_something_sync(self, k, v):
print("Values_sync", k, v)
def calling_func_sync(self):
for k, v in self.test_data.items():
print("Sending_sync", k, v)
self.loop.call_soon(self.do_something_sync, k, v)
if __name__ == "__main__":
a = TestObj()
Run Code Online (Sandbox Code Playgroud)
输出是:
Sending …Run Code Online (Sandbox Code Playgroud) 我asyncio在Python3.x 中使用异步/并发事件循环。
asyncio在 Go lang 中是否有任何等效或协程具有使用线程的并发性能?
[注意]:
不是并行+并发(多处理)模式。
[更新]:
这是一个asyncio在 Python 中使用的异步事件循环,以便更好地理解:
import asyncio
import time
async def async_say(delay, msg):
await asyncio.sleep(delay)
print(msg)
async def main():
task1 = asyncio.ensure_future(async_say(4, 'hello'))
task2 = asyncio.ensure_future(async_say(6, 'world'))
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
出去:
started at 13:19:44
hello
world
finished at 13:19:50
Run Code Online (Sandbox Code Playgroud)
任何帮助将不胜感激。
我正在尝试使用 python 中的异步从网站获取数据。作为示例,我使用了此代码(在“更好的协程示例”下):https ://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/
现在这工作正常,但它将二进制块写入文件,而我不希望它在文件中。我想要直接得到结果数据。但我目前有一个协程对象列表,我无法从中获取数据。
代码:
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
import async_timeout
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
async def main(loop, urls):
async with aiohttp.ClientSession(loop=loop) as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
return tasks
# time normal way of retrieval
if __name__ == '__main__':
urls = [a list of urls..]
loop = asyncio.get_event_loop()
details_async = loop.run_until_complete(main(loop, urls))
Run Code Online (Sandbox Code Playgroud)
谢谢
我想创建一个从网站异步下载的功能。我需要将下载结果连接到输入参数,以便我可以在下载后使用结果和参数。
我目前有以下内容:
async def download(session, url, var1, var2):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await (response.read(), url, var1, var2)
async def loop_download(loop, urls, var1s, var2s):
async with aiohttp.ClientSession(loop=loop) as session:
tasks = [download(session, url, var1, var2) for url, var1, var2 in zip(urls, var1s, var2s)]
results = await asyncio.gather(*tasks)
return results
loop = asyncio.get_event_loop()
results = loop.run_until_complete(loop_download(loop, urls, var1s, var2s))
Run Code Online (Sandbox Code Playgroud)
然而这会返回一个错误:
TypeError: object tuple can't be used in 'await' expression
Run Code Online (Sandbox Code Playgroud)
如何将一些输入数据(例如网址)加入到结果中,以便我可以使用它进行进一步分析?
我的代码出现了一些问题。我有一个aiohttp客户端会话,它通过请求与网站进行通信。
问题是,当我长时间运行代码时,我开始收到一些错误,例如ClientResponseError, ServerDisconnectedError, Error 101。所以我正在阅读文档,我看到了这个:
release()
将连接释放回连接器。
底层套接字未关闭,如果连接超时(默认为 30 秒)未过期,则稍后可以重用连接。
但我不明白。有人可以简单地解释一下吗?它会解决我的问题吗?
session = aiohttp.ClientSession(cookie_jar=cookiejar)
while True:
await session.post('https://anywhere.com', data={'{}': ''})
Run Code Online (Sandbox Code Playgroud) 我正在编写一个应该使用 inotify 响应文件更改的程序。下面的骨架程序按我的预期工作......
# test.py
import asyncio
import ctypes
import os
IN_CLOSE_WRITE = 0x00000008
async def main(loop):
libc = ctypes.cdll.LoadLibrary('libc.so.6')
fd = libc.inotify_init()
os.mkdir('directory-to-watch')
wd = libc.inotify_add_watch(fd, 'directory-to-watch'.encode('utf-8'), IN_CLOSE_WRITE)
loop.add_reader(fd, handle, fd)
with open(f'directory-to-watch/file', 'wb') as file:
pass
def handle(fd):
event_bytes = os.read(fd, 32)
print(event_bytes)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Run Code Online (Sandbox Code Playgroud)
...因为它输出...
b'\x01\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00file\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Run Code Online (Sandbox Code Playgroud)
但是,如果我将其更改为尝试读取 31 个字节...
b'\x01\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00file\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Run Code Online (Sandbox Code Playgroud)
......然后它引发了一个异常......
Traceback (most recent call last):
File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/t.py", line 19, in handle
event_bytes = os.read(fd, 31)
OSError: …Run Code Online (Sandbox Code Playgroud) 我目前有一个未在整个应用程序中设置的全局变量。我有两个文件,其中 file2 从 file1 导入。全局在 file1 中初始化。
这是初始化全局变量并稍后在 file1 中使用它的代码。
import time
import asyncio
#Initialize global
CONNECTION_OPEN = False
async def calculate_idle(t):
orig_time = t
global CONNECTION_OPEN
while True:
await asyncio.sleep(5)
print("GLOBAL CONNECTION", CONNECTION_OPEN)
if CONNECTION_OPEN:
print("This value is now true")
else:
print("Value is still false")
Run Code Online (Sandbox Code Playgroud)
这是将全局设置为 true 的 websocket 代码。它位于文件 2 中。
import os
import asyncio
import websockets
import json
import threading
import time
from random import randrange
from enum import Enum
from lights import calculate_idle,CONNECTION_OPEN
async def init_connection(message): …Run Code Online (Sandbox Code Playgroud) 第一次尝试asyncio并且aiohttp。我有以下urls从MySQL数据库获取GET请求的代码。获取响应并将其推送到MySQL数据库。
if __name__ == "__main__":
database_name = 'db_name'
company_name = 'company_name'
my_db = Db(database=database_name) # wrapper class for mysql.connector
urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
update_id = my_db.get_updateid()
my_db.get_connection(dictionary=True)
for url in urls_dict:
url_id = url['id']
url = url['url']
table_name = my_db.make_sql_table_name_by_url(url)
insert_query = my_db.get_sql_for_insert(table_name)
r = requests.get(url=url).json() # make the request
args = [json.dumps(r), update_id, url_id]
my_db.db_execute_one(insert_query, args, close_conn=False)
my_db.close_conn()
Run Code Online (Sandbox Code Playgroud)
这工作正常,但要加快速度我该如何运行它asynchronously?
在我下面的简单 asyncio 代码中,应用程序有一个任务self.add_item_loop_task不断地向asyncio.Queuenamed 中添加一个整数self.queue,而第二个任务则self.get_item_loop_task不断地等待将某些内容添加到队列中并将print其退出。
但是,这个应用程序0在我运行时只打印一次,然后卡在那里。我相信循环self.get_item_loop_task没有继续。为什么会这样?
import asyncio
class App:
def __init__(self):
self.queue = asyncio.Queue()
async def start(self):
self.add_item_loop_task = asyncio.create_task(self.add_item_loop())
self.get_item_loop_task = asyncio.create_task(self.get_item_loop())
await asyncio.wait(
[
self.add_item_loop_task,
self.get_item_loop_task,
]
)
async def stop(self):
self.add_item_loop_task.cancel()
self.get_item_loop_task.cancel()
async def add_item_loop(self):
i = 0
while True:
await self.queue.put(i)
i += 1
await asyncio.sleep(1)
async def get_item_loop(self):
while True:
item = await self.queue.get()
print(item)
app = App()
try:
asyncio.run(app.start())
except …Run Code Online (Sandbox Code Playgroud) python-asyncio ×10
python ×8
aiohttp ×4
python-3.x ×4
async-await ×1
asynchronous ×1
concurrency ×1
coroutine ×1
ctypes ×1
event-loop ×1
go ×1
inotify ×1
linux ×1
queue ×1
tornado ×1