我一直在阅读python 3中的asyncio模块,更广泛地讲述python中的协同程序,我无法得到使asyncio成为如此优秀工具的原因.我觉得你可以用协同程序做所有事情,你可以通过使用基于多处理模块的任务队列(例如芹菜)来做得更好.是否存在协同程序优于任务队列的用例?
我之前可以看到类似的问题但这些问题是运行多处理器而不是执行程序.因此,我不确定如何解决这个问题.
GitHub问题也说它在4.1中解决了https://github.com/celery/celery/issues/1709
我在用
celery==4.1.1
django-celery==3.2.1
django-celery-beat==1.0.1
django-celery-results==1.0.1
Run Code Online (Sandbox Code Playgroud)
我的脚本如下,我试图将其删除以仅显示相关代码.
@asyncio.coroutine
def snmp_get(ip, oid, snmp_user, snmp_auth, snmp_priv):
results=[]
snmpEngine = SnmpEngine()
errorIndication, errorStatus, errorIndex, varBinds = yield from getCmd(
...
)
...
for varBind in varBinds:
results.append(' = '.join([x.prettyPrint() for x in varBind]))
snmpEngine.transportDispatcher.closeDispatcher()
return results
def create_link_data_record(link_data):
obj = LinkData.objects.create(
...
)
return 'data polled for {} record {} created'.format(link_data.hostname, obj.id)
async def retrieve_data(link, loop):
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(2)
poll_interval = 60
results = []
# credentials: …Run Code Online (Sandbox Code Playgroud) 我使用 websockets(AsyncWebsocketConsumer、django-channels)进行网络聊天。我正在使用 celery 来解析请求,但每次我尝试将响应发送回消费者时,它都会停止,没有可调试的(对我来说)错误。
这次尝试给了我下一个错误:
@shared_task
def execute(command, parameter, room_group_name):
if command == '/stock':
loop = asyncio.get_event_loop()
loop.run_until_complete(sendData(stock(parameter), "BOT", room_group_name))
return True
loop = asyncio.get_event_loop()
loop.run_until_complete(sendData("I do not understand that parameter", "BOT", room_group_name))
return True
from channels.layers import get_channel_layer
async def sendData(message, from_, room_group_name):
channel_layer = get_channel_layer()
import datetime
currentDT = datetime.datetime.now()
datetime = currentDT.strftime("%Y-%m-%d %H:%M:%S")
await channel_layer.group_send(
room_group_name,
{
'type': 'chat_message',
'username': from_,
'datetime': datetime,
'message': message
}
)
await asyncio.sleep(5)
Run Code Online (Sandbox Code Playgroud)
错误:
[2019-05-12 18:01:15,491: ERROR/ForkPoolWorker-1] Task chat.tasks.execute[8a69afca-8173-46d0-84bc-4ee5ce7782ca] raised unexpected: …Run Code Online (Sandbox Code Playgroud) 我有一个当前使用 asyncio 编写的后端应用程序:用于 Web 服务器的 fastapi,用于异步数据库驱动程序的 sqlalchemy 1.4 + asyncpg。我需要将任务部署给将运行和更新主机应用程序的工作人员。目前我正在使用aio_pika,但想要更强大的东西,例如celerywith flower。
我知道 celery 没有与 asyncio 集成。我也读过这样的答案,我担心的是任务不是异步的,这是微不足道的。我担心从主事件循环内启动任务。
我的主要问题是my_task.delay()/是否完全my_task.apply_async()阻塞正在运行的线程?如果是这样,更好的方法是使用来自中央或 a的多处理工作人员,然后仅从该工作人员进程部署 celery 任务吗?getmp.QueueProcessPoolExecutor
我想要部署任务,并且最好在任务完成时收到通知。不过,这可以通过界面在任务本身内完成fastapi。我只是想确保部署任务不会阻止异步事件循环。
我有以下内容,但由于某种原因,我没有得到查询结果,而是得到了其他东西
这是我使用的Python包
这是文档
from databases import Database
database=Database('postgres://redacted')
await database.connect()
...
...
...
query = "SELECT orders.id AS orders_id, orders.notification_method AS orders_notification_method WHERE shipped=True"
result = await database.fetch_all(query=query))
Run Code Online (Sandbox Code Playgroud)
这是我得到的而不是获取查询结果
打印(结果)
[<databases.backends.postgres.Record object at 0x7fb3f415ac50>, <databases.backends.postgres.Record object at 0x7fb3f415ae30>, <databases.backends.postgres.Record object at 0x7fb3f415a470>, <databases.backends.postgres.Record object at 0x7fb3f415af50>, <databases.backends.postgres.Record object at 0x7fb3f415ad70>, <databases.backends.postgres.Record object at 0x7fb3f415ab30>, <databases.backends.postgres.Record object at 0x7fb3f415a7d0>, <databases.backends.postgres.Record object at 0x7fb3f415ae90>]
Run Code Online (Sandbox Code Playgroud)
type 表示这是一个列表
打印(类型(结果))
<class 'list'>
Run Code Online (Sandbox Code Playgroud)
如何返回 SQL 查询的实际结果(即返回查询中的所有行)?
这是
sqlalchemy有效的版本,但使用databases包无法正常工作,如上所述
我非常想要实现的是从 sqlalchemy …
python ×4
celery ×3
asynchronous ×1
celery-task ×1
fastapi ×1
postgresql ×1
sqlalchemy ×1
task ×1
websocket ×1