我们有一个基于rabbitMQ和的分布式架构Celery.我们可以并行启动多个任务而不会出现任何问题.可扩展性很好.
现在我们需要远程控制任务:PAUSE,RESUME,CANCEL.我们发现的唯一解决方案是在Celery任务中对DB请求后回复命令的另一个任务进行RPC调用.Celery任务和RPC任务不在同一台机器上,只有RPC任务可以访问数据库.
您是否有任何建议如何改进它并轻松地与正在进行的任务沟通?谢谢
编辑: 
事实上,我们想做的事情如下图所示.这很容易进行Blue配置或者Orange,但是我们不知道如何同时进行这两种配置.
 工人正在订阅一个共同点
工人正在订阅一个共同点Jobs queue,每个工人都有自己Admin queue在交易所申报的.
编辑: 
如果这是不可能的Celery,我愿意接受其他框架的解决方案python-rq.
我正在使用RQ,我有一个failed包含数千个项目的队列,另一个test队列我创建了一段时间用于测试现在是空的和未使用的.我想知道如何从failed队列中删除所有作业,并test完全删除队列?
试图用来python-rq支持我们的Web应用程序的后端,但推动新的工作需要很长时间 - 最多12秒.
执行enqueue_call函数调用时会发生性能损失,特别是当连接到系统的工作进程数增加(超过200)时.  
系统工作原理如下:
enqueue_call除了要执行的函数的实际参数之外,它还使用函数将参数传递给作业(例如timeout和ttl).screen.工作人员遵循文档中提供的模式,执行Worker.work()无限循环函数来侦听队列.关于基础设施:
redis-benchmark具有任务队列的服务器上运行时,对于大多数基准测试,我们得到的结果平均超过20000 r/s.在这种情况下,我们如何才能提高新工作的推动绩效?我们应该使用更好的模式吗?
我想用rq排队我的ml预测.示例代码(pesudo-ish):
predict.py:
import tensorflow as tf
def predict_stuff(foo):
    model = tf.load_model()
    result = model.predict(foo)
    return result
app.py:
from rq import Queue
from redis import Redis
from predict import predict_stuff
q = Queue(connection=Redis())
for foo in baz:
    job = q.enqueue(predict_stuff, foo)
worker.py:
import sys
from rq import Connection, Worker
# Preload libraries
import tensorflow as tf
with Connection():
    qs = sys.argv[1:] or ['default']
    w = Worker(qs)
    w.work()
我已经阅读了rq文档,解释说你可以预加载库以避免每次运行作业时都导入它们(因此在示例代码中我在worker代码中导入tensorflow).但是,我还希望移动模型加载,predict_stuff以避免每次工作人员运行作业时加载模型.我该怎么办呢?
我将在我的项目中开始使用django-rq.
Django与基于Redis的Python排队库RQ集成.
测试使用RQ的django应用程序的最佳做法是什么?
例如,如果我想将我的应用程序测试为黑盒子,那么在用户执行某些操作后,我想执行当前队列中的所有作业,然后检查我的数据库中的所有结果.我怎么能在我的django测试中做到这一点?
我正在使用django-redis和django_rq框架来支持我在Heroku上的Django应用程序的redis缓存和redis后台任务处理.它在过去很顺利,但是现在DatabaseError SSL error: decryption failed or bad record mac每次我的一个工作都运行起来我都会得到一个.
我在https://devcenter.heroku.com/articles/postgres-logs-errors文章中看到这个错误通常与Postgres发生 ,但它并没有给我任何有用的python设置.
如何将作业结果传递给依赖它的作业?
我目前所做的是将第一份工作的ID传递给第二份工作,
first = queue.enqueue(firstJob)
second = queue.enqueue(secondJob, first.id, depends_on=first);
并在内部secondJob获取第一份工作以获得结果
first = queue.fetch_job(previous_job_id)
print first.result
这是推荐的方式吗?有没有其他模式可以用来直接将第一个作业的结果传递给第二个?
我正在尝试使用python-rq将redis中的基本作业排入队列,但它会抛出此错误
"ValueError:工作人员无法处理主模块的功能"
import requests
def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())
from rq import Connection, Queue
from redis import Redis
redis_conn = Redis()
q = Queue(connection=redis_conn)
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print job
我们最近被迫用RQ取代芹菜,因为它更简单,芹菜给我们带来太多问题.现在,我们无法找到一种动态创建多个队列的方法,因为我们需要同时完成多个作业.因此,基本上我们对其中一条路线的每个请求都应该启动一项工作,让多个用户等待一个用户的工作完成后才能继续下一个工作是没有意义的.我们会定期向服务器发送请求,以获取作业状态和一些元数据.这样我们就可以使用进度条更新用户(这可能是一个漫长的过程,所以必须为了UX而这样做)
我们正在使用Django和Python的rq库.我们没有使用django-rq(如果使用它有优势,请告诉我)
到目前为止,我们在一个控制器中启动了一项任务,例如:
redis_conn = Redis()
q = Queue(connection=redis_conn)  
job = django_rq.enqueue(render_task, new_render.pk, domain=domain, data=csv_data, timeout=1200)
然后在我们的render_task方法中,我们根据长任务的状态将元数据添加到作业:
current_job = get_current_job()
current_job.meta['state'] = 'PROGRESS'
current_job.meta['process_percent'] = process_percent
current_job.meta['message'] = 'YOUTUBE'
current_job.save()
现在我们有另一个端点获取当前任务及其元数据并将其传递回客户端(这通过oeriodic AJAX请求发生)
我们如何在不阻碍其他工作的情况下同时运行工作?我们应该动态制作队列吗?有没有办法利用工人来实现这一目标?
python-rq ×10
python ×8
redis ×6
django ×3
celery ×2
task-queue ×2
asynchronous ×1
heroku ×1
postgresql ×1
rabbitmq ×1
testing ×1