我正在寻找一个提供以下功能的WorkQueue API:
java.util.Queue 兼容jdk中有很多有趣的实现,例如java.util.DelayQueue我可以使用它.我只是想确保我不重新发明轮子.
我知道您可以在仪表板或开发服务器控制台中查看当前排队和正在运行的任务.但是,有没有办法以编程方式获取该列表?文档仅描述如何向队列添加任务,而不描述如何列出和/或取消它们.
请在python中.
是否可以以编程方式查询Task Queue API以查看当前正在执行/挂起的任务数量?
我没有看到任何方法在API中执行此操作,因此我使用在数据存储区中创建对象来表示排队的任务.运行时,任务会从数据存储中删除相应的条目.
可以想象,这很容易失去同步.我真的很高兴能够在队列中获得给定队列名称的简单任务计数.
我希望保护我的任务队列URL免受恶意访问.
在任务队列请求我的视图中:
if not users.is_current_user_admin():
return HttpResponse(status=403)
Run Code Online (Sandbox Code Playgroud)
但我的任务队列收到403错误!我从这个GAE文档中得到的印象是,任务队列用户已被gauranteed称为管理员.是什么赋予了?
注意:我正在使用DjangoNonRel,所以我无法在我的指定管理员唯一的URL访问app.yaml,我必须在视图中以编程方式.
django google-app-engine task-queue django-nonrel djangoappengine
我正在寻找能够概述"排队"的文章和参考文献(我可能在这里没有使用正确的术语).我希望通过Redis,RabbitMQ,Celery,Kombu以及其他任何我尚未阅读的组件,以及它们如何组合在一起的介绍性风格指南.
我的问题是我需要排队由我的Django网站发布的后台任务,我阅读的每篇博客和文章都推荐不同的解决方案.
我有几个我想在后台执行的操作,但它们必须一个接一个地同步执行.
我想知道使用Task.ContinueWith方法实现这一点是否是个好主意.你预见到这有什么问题吗?
我的代码看起来像这样:
private object syncRoot =new object();
private Task latestTask;
public void EnqueueAction(System.Action action)
{
lock (syncRoot)
{
if (latestTask == null)
latestTask = Task.Factory.StartNew(action);
else
latestTask = latestTask.ContinueWith(tsk => action());
}
}
Run Code Online (Sandbox Code Playgroud) 基本上我有很多任务(大约1000个批次),这些任务的执行时间可以有很大的不同(从不到几秒到10分钟).我知道如果一项任务执行超过一分钟我就可以杀死它.这些任务是优化某些数据挖掘模型的步骤(但彼此独立),并且大部分时间都在一些C扩展函数中,因此如果我试图优雅地杀死它们,它们就不会合作.
是否存在适合该模式的分布式任务队列--- AFAIK:celery允许中止愿意合作的任务.但我可能错了.
我最近在多线程环境中询问了关于在Python中杀死挂起函数的类似问题.
我想我可以将celery任务子类化,因此它会生成一个新进程,然后执行其有效负载,如果需要很长时间就会中止它的执行,但之后我会被初始化新解释器的开销所杀死.
如果这是一个愚蠢的问题,我道歉并将羞辱我的头,但是:
我正在使用rq在Python中排队作业.我希望它像这样工作:
我的代码到目前为止:
redis_conn = Redis()
use_connection(redis_conn)
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later
w = Worker(q)
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid)
w.work()
Run Code Online (Sandbox Code Playgroud)
我认为我最好的解决方案是拥有2名工人,一名为工作A,一名为B工作.工作B工人可以监督工作A,当工作A完成时,开始工作B.
我无法想象拯救我的生命是我如何让一个工人监视另一个人的状态.我可以通过job.id从作业A中获取作业ID.我可以使用w.name获取工作者名称.但对于我如何将任何信息传递给其他工作人员并不是最模糊的.
或者,有一个更简单的方法来做到这一点,我完全失踪了?
我正在编写一个烧瓶应用程序,要求用户上传 excel 电子表格,然后计算并填充数据库。我试图通过Redis RQ在后台进行处理部分,但我不断收到TypeError: cannot serialize '_io.TextIOWrapper' object my代码如下所示:
from redis import Redis
from rq import Queue
from rq.job import Job
import xlrd as x
workbook = x.open_workbook('data.xls')
sheet = workbook.sheet_by_index(0)
q = Queue(connection = Redis())
def populate(sheet,row,column):
#extract data and save into database
job = enqueue_call(func=populate, args=(sheet,7,5), result_ttl = 5000)
print(job.get_id())
Run Code Online (Sandbox Code Playgroud) 我有一个应用程序可以同时从多个用户接收三种类型的消息,我正在使用 3 个队列/消费者和 3 个交换器处理它。我的问题是,当队列中有来自一个用户的数千条消息时,其他用户正在等待。
我正在寻找一种解决方案来并行执行每个用户的作业。我可以为每个用户动态创建队列,但这不是一个好的解决方案,因为会有数百个队列和使用者。如何自动删除空闲的消费者/队列。我可以以任何方式使用 celery 和 redis 来解决这个问题吗?