我有一个带有协程方法的事件循环,使用asyncio.
我正在寻找使用uvloop替代以下示例的等效项。
这是一个简单的asyncio事件循环示例:
import asyncio
async def read(**kwargs):
oid = kwargs.get('oid', '0.0.0.0.0.0')
time = kwargs.get('time', 1)
try:
print('start: ' + oid)
except Exception as exc:
print(exc)
finally:
await asyncio.sleep(time)
print('terminate: ' + oid)
def event_loop(configs):
loop = asyncio.get_event_loop()
for conf in configs:
asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
return loop
if __name__ == '__main__':
snmp_configurations = [
{'time': 5, 'oid': '1.3.6.3.2.4'},
{'time': 6, 'oid': '1.3.6.3.5.8'},
] # TODO :: DUMMY
loop = event_loop(snmp_configurations)
try:
loop.run_forever()
except KeyboardInterrupt:
pass …Run Code Online (Sandbox Code Playgroud) Python 可以在等待 asyncio.sleep 和 aiohttp 函数时切换上下文。python 最终如何知道上下文何时需要切换?如果我想实现自己的IO功能,需要调用哪些API?
基于http://www.dabeaz.com/coroutines/Coroutines.pdf,我们可以在使用yield编程时使用select api让上下文切换。在带有async和await的python 3中,它仍然是唯一的方法吗?
我有以下代码
session = aiohttp.ClientSession()
async def fetch(session, url):
while True:
try:
async with session.get(url) as response:
assert response.status == 200
return await response.json()
except Exception as error:
print(error)
class FPL():
async def get_player_summaries(self, player_ids=[], return_json=False):
tasks = [asyncio.ensure_future(
fetch(session, API_URLS["player"].format(player_id)))
for player_id in player_ids]
player_summaries = await asyncio.gather(*tasks)
if return_json:
return player_summaries
return [PlayerSummary(player_summary)
for player_summary in player_summaries]
async def get_points_against(self):
players = await self.get_players(return_json=True)
player_ids = [player["id"] for player in players]
player_summaries = await self.get_player_summaries(
player_ids, return_json=True)
points_against = …Run Code Online (Sandbox Code Playgroud) 我在 python 3.7 上运行 django 2.2.3。我想要一个带有 asyncio 的解决方案,以便 api 可以只调用异步函数并返回响应,而无需等待我们使用 jquery Promise 执行操作的方式。定义my_coro只是举例。我将运行moviepy通常需要 40-50 秒才能完成的功能。我不希望 api 等待那么长时间才能发送响应。我也对如何处理线程池感到困惑。这里如何使用线程池呢?因为我也打算让电影迭代更快。那么如何创建一个处理调用的池my_coro呢?
async def my_coro(n):
print(f"The answer is {n}.")
async def main():
await asyncio.gather(my_coro(1),my_coro(2),my_coro(3),my_coro(4))
class SaveSnaps(APIView):
def post(self, request, format = None):
if request.user.is_anonymous:
return Response({"response":"FORBIDDEN"}, status = 403)
else:
try:
asyncio.run(main())
return Response({"response": "success"}, status = 200)
except Exception as e:
return Response({'response':str(e)}, status = 400)
Run Code Online (Sandbox Code Playgroud)
更新:
我尝试使用芹菜。但由于我不会使用周期性任务,实际上我需要异步接收 blob 数组作为参数的方法。celerytask.delay给我一个错误,因为任务需要可序列化的参数。所以我回到了这一点。我不确定我是否应该坚持使用线程解决方案或其他解决方案。
更新:我忘了分享我最后做了什么。我转移到celery. 但由于预期celery …
python django asynchronous django-rest-framework python-asyncio
我运行这个测试代码:
import telethon.sync
from telethon import TelegramClient
from telethon.tl.functions.messages import AddChatUserRequest
from telethon.tl.functions.contacts import ImportContactsRequest
api_id = XXXXXXX
api_hash = 'XXXXXXXXXXXXXXC'
with TelegramClient('anon', api_id, api_hash) as client:
async def main():
client(AddChatUserRequest(-XXXXXXXXXXXXXX, ['username'], fwd_limit=10))
main()
Run Code Online (Sandbox Code Playgroud)
它给了我这个:
/data/data/ru.iiec.pydroid3/files/temp_iiec_codefile.py:19: RuntimeWarning: coroutine 'main' was never awaited
main()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Run Code Online (Sandbox Code Playgroud)
我应该怎么做才能使程序正常运行?
import asyncio
from multiprocessing import Queue, Process
import time
task_queue = Queue()
# This is simulating the task
async def do_task(task_number):
for progress in range(task_number):
print(f'{progress}/{task_number} doing')
await asyncio.sleep(10)
# This is the loop that accepts and runs tasks
async def accept_tasks():
event_loop = asyncio.get_event_loop()
while True:
task_number = task_queue.get() <-- this blocks event loop from running do_task()
event_loop.create_task(do_task(task_number))
# This is the starting point of the process,
# the event loop runs here
def worker():
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(accept_tasks()) …Run Code Online (Sandbox Code Playgroud) 我正在使用 FastApi 并且有一个端点。
我有两个长时间运行的函数,我想使用 asyncio 同时运行
因此,我创建了两个函数:
async def get_data_one():
return 'done_one'
async def get_data_two():
return 'done_two'
Run Code Online (Sandbox Code Playgroud)
这些函数从外部 Web 服务获取数据。
我想同时执行它们,所以我创建了另一个函数来执行它:
async def get_data():
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
task_1 = loop.create_task(get_data_one)
task_2 = loop.create_task(get_data_two)
tasks = (task_1, task_2)
first, second = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
# I will then perform cpu intensive computation on the results
# for now - assume i am just concatenating the results
return first + second
Run Code Online (Sandbox Code Playgroud)
最后,我有我的终点:
@app.post("/result")
async def get_result_async():
r = await get_data()
return r …Run Code Online (Sandbox Code Playgroud) 我正在尝试为 Django 网站制作一个不和谐的机器人。在实现的时候我发现Django数据库不允许异步操作,我们必须使用线程或sync_to_async.
我使用过sync_to_async并且似乎有效。但后来我遇到了一个问题,我什至无法弄清楚它出了什么问题以及那里发生了什么。
谁能解释我在那里做错了什么以及那里发生了什么?
@sync_to_async
def get_customer(id):
try:
return Customer.objects.get(discord_id=id)
except:
return None
#------
@bot.command()
async def categories(ctx):
customer = await get_customer(ctx.author.id) #this line of code works well
categories = await sync_to_async(Category.objects.all().filter)(customer=customer)
#categories = await sync_to_async(Category.objects.filter)(customer=customer)#I tried this also, didn't work
print(categories) #problem is in the line
embed = Embed(title='', description="{}".format('\n'.join([ x.name for x in categories])))
await ctx.send(embed=embed)
Run Code Online (Sandbox Code Playgroud)
我收到的回溯错误是:
Running bot
Ignoring exception in command categories:
Traceback (most recent call last):
File "/Users/rhidwan/Desktop/personal_transaction/venv/lib/python3.7/site-packages/discord/ext/commands/core.py", line 83, …Run Code Online (Sandbox Code Playgroud) 我刚刚开始尝试 python 中的 asyncio 库。我的目的是加快我的代码速度,但是对于我的第一个脚本来说,确实没有比不使用 asyncio 的情况有任何改进。
from yahoo_fin import stock_info as si
from yahoo_fin.stock_info import *
import time
import asyncio
async def price(stock):
prijs = str(si.get_live_price(stock))
await asyncio.sleep(0.001)
print(prijs)
def main():
loop = asyncio.get_event_loop()
t0 = time.time()
task = asyncio.gather(
price('aapl'),
price('fcx'),
price('acn'),
price('aapl'),
price('fcx'),
price('acn'),
price('aapl'),
price('fcx'),
price('acn'),
price('adbe'),
price('aapl'),
price('fcx'),
price('acn'),
price('aapl'),
price('fcx'),
price('acn'),
price('aapl'),
price('fcx'),
price('acn'),
price('adbe')
)
loop.run_until_complete(task)
t1 = time.time()
print("took %.2f ms" % (1000*(t1-t0)))
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
如果我比较它而不对其进行异步编码:
from yahoo_fin import …Run Code Online (Sandbox Code Playgroud) 我正在编写一个Python程序,它同时运行从队列中获取的任务,以学习 asyncio.
通过与主线程(在 REPL 内)交互,项目将被放入队列中。每当一个任务被放入队列时,它应该立即被消耗并执行。我的方法是启动一个单独的线程并将队列传递到该线程内的事件循环。
这些任务正在运行,但只是按顺序运行,我不清楚如何同时运行这些任务。我的尝试如下:
import asyncio
import time
import queue
import threading
def do_it(task_queue):
'''Process tasks in the queue until the sentinel value is received'''
_sentinel = 'STOP'
def clock():
return time.strftime("%X")
async def process(name, total_time):
status = f'{clock()} {name}_{total_time}:'
print(status, 'START')
current_time = time.time()
end_time = current_time + total_time
while current_time < end_time:
print(status, 'processing...')
await asyncio.sleep(1)
current_time = time.time()
print(status, 'DONE.')
async def main():
while True:
item = task_queue.get()
if item == _sentinel:
break
await …Run Code Online (Sandbox Code Playgroud) python ×10
python-asyncio ×10
async-await ×3
python-3.x ×3
asynchronous ×2
django ×2
aiohttp ×1
concurrency ×1
discord.py ×1
django-orm ×1
fastapi ×1
queue ×1
task ×1
telethon ×1
uvloop ×1