我有一个有向无环图networkx。每个节点代表一个任务,节点的前驱是任务依赖项(给定任务在其依赖项执行之前无法执行)。
我想在异步任务队列中“执行”图表,类似于提供的celery服务(以便我可以轮询作业的状态、检索结果等)。Celery 不提供创建 DAG 的能力(据我所知),并且在所有依赖项完成后立即转移到 a 的能力task将是至关重要的(DAG 可能有多个路径,即使一个任务很慢/阻塞) ,可能会继续执行其他任务等)。
有没有任何简单的例子说明我如何实现这一目标,或者甚至networkx与集成celery?
我有一个Flask应用程序,其注册如下:
APP = Flask(__name__)
APP.config.from_object('config')
Run Code Online (Sandbox Code Playgroud)
我为URL定义了一个视图,在该视图中调用了一个函数,该函数与DB交互。
from tasks import some_func
.
.
.
some_func.delay(params)
Run Code Online (Sandbox Code Playgroud)
在task.py文件中,我正在创建Celery实例,如下所示:
# Config values don't work here
celery = Celery('tasks', broker='amqp://', backend='amqp://')
.
.
.
@celery.task()
def some_func():
#DB interactions
Run Code Online (Sandbox Code Playgroud)
现在我得到一个错误,指出:
RuntimeError: Working outside of application context.
Run Code Online (Sandbox Code Playgroud)
我了解了应用程序上下文以及如何使用它们。我已经导入current_app我的tasks.py文件,并使用上下文如下尝试:
@celery.task()
def some_func():
with current_app.app_context():
#DB interactions
Run Code Online (Sandbox Code Playgroud)
但是我仍然收到相同的错误。我还尝试从主文件中推送上下文,如下所示:
ctx = APP.app_context()
ctx.push()
Run Code Online (Sandbox Code Playgroud)
但是还没有运气。
如何使Celery与Flask配合使用?
有没有办法确保GAE上的任务队列的FIFO(先进先出)行为?
GAE文档说FIFO是影响任务执行顺序的因素之一,但是相同的文档说"系统的调度可以'将新任务"跳到队列的头部"并且我已经通过测试确认了这种行为.效果:我的事件正在按顺序处理.
文件说:
https://developers.google.com/appengine/docs/java/taskqueue/overview-push
执行任务的顺序取决于几个因素:
任务在队列中的位置.App Engine尝试基于FIFO>(先进先出)顺序处理任务.通常,任务被插入到队列的末尾,并从队列的头部执行.
队列中的任务积压.系统尝试通过对调度程序的特别优化的通知,为任何给定任务提供尽可能低的延迟.因此,在队列具有大量积压任务的情况下,系统的调度可以将新任务"跳转"到队列的头部.
任务的etaMillis属性的值.此属性指定任务可以执行的最早时间.App Engine始终等待指定的ETA处理推送任务.
任务的countdownMillis属性的值.此属性指定在执行任务之前等待的最小秒数.倒计时和eta是互斥的; 如果指定一个,请不要指定另一个.
我需要做什么?在我的用例中,我将每天处理1-2万个来自车辆的事件.这些事件可以任何间隔(1秒,1分钟或1小时)发送.必须确保事件处理的顺序.我需要按时间戳顺序处理,这是在车辆内部的嵌入式设备上生成的.
我现在有什么?
由使用者调用并创建任务的Rest servlet(事件数据在有效负载上).
在此之后,工作者servlet获得此任务并且:
反序列化事件数据;
将事件放在数据存储区中;
更新数据存储上的车辆.
那么,有没有什么方法可以确保FIFO行为?或者我怎样才能改进这个解决方案呢?
我目前正在将一个长时间运行的作业卸载到TaskQueue来计算数据存储区中NDB实体之间的连接.
基本上,此队列处理几个实体键列表,这些实体键与节点中query的node_in_connected_nodes函数相关联GetConnectedNodes:
class GetConnectedNodes(object):
"""Class for getting the connected nodes from a list of nodes in a paged way"""
def __init__(self, list, query):
# super(GetConnectedNodes, self).__init__()
self.nodes = [ndb.model.Key('Node','%s' % x) for x in list]
self.cursor = 0
self.MAX_QUERY = 100
# logging.info('Max query - %d' % self.MAX_QUERY)
self.max_connections = len(list)
self.connections = deque()
self.query=query
def node_in_connected_nodes(self):
"""Checks if a node exists in the connected nodes of the next node in the
node list. …Run Code Online (Sandbox Code Playgroud) python google-app-engine memory-leaks task-queue app-engine-ndb
试图找到一种捕获RQ作业超时的好方法,因此可以在超时后重新排队.
基本上,正确的解决方案将提供一种方法(例如,工作者中的异常处理程序或类似的东西)来重新排队超时的作业.此外,如果作业返回failed队列,那也是一个很好的答案.
非常感谢!任何帮助将不胜感激!
我正在使用Django进行新的Web项目.它基本上是Quora克隆.可以说我在它下面有一个问题和Upvote按钮.如果大约8-9k的人同时点击那个Upvote按钮(同一个问题),我该如何处理呢?我使用Nginx作为前端服务器而Apache作为后端服务器?我应该用什么来处理这么多的负载?
我正在尝试解决以下问题:
棘手的部分:我想高效地完成所有这些(即最小延迟并使用尽可能多的并行 API 调用 - 不超过限制),但同时尝试防止从“生成大量任务” “大”原始任务延迟“小”原始任务生成的任务。
换句话说:我希望为每个任务分配一个“优先级”,其中“小”任务具有更高的优先级,从而防止“大”任务导致饥饿。
一些搜索似乎并没有表明任何预制的东西可用,所以我想出了以下内容:
tasks-small, tasks-medium,tasks-largetasks-small为max_concurrent_requests30、60tasks-medium和tasks-large100)例如,如果我们有T1一个小任务的一部分的任务,首先检查是否tasks-small有空闲的“槽”并将其排入队列。否则检查tasks-medium和tasks-large。如果它们都没有空闲插槽,则tasks-small无论如何将其排队,并且它将在处理之前添加的任务之后进行处理(注意:这不是最佳选择,因为如果“插槽”在其他队列上释放,它们仍然不会处理队列中待处理的任务tasks-small)
另一种选择是使用 PULL 队列,并让一个中央“协调器”根据优先级从该队列中拉取并分派它们,但这似乎会增加一点延迟。
然而,这似乎有点老套,我想知道是否有更好的选择。
编辑:经过一些想法和反馈后,我正在考虑通过以下方式使用 PULL 队列:
medium-tasks和large-tasks …是否可以以编程方式查询Task Queue API以查看当前正在执行/挂起的任务数量?
我没有看到任何方法在API中执行此操作,因此我使用在数据存储区中创建对象来表示排队的任务.运行时,任务会从数据存储中删除相应的条目.
可以想象,这很容易失去同步.我真的很高兴能够在队列中获得给定队列名称的简单任务计数.
如果这是一个愚蠢的问题,我道歉并将羞辱我的头,但是:
我正在使用rq在Python中排队作业.我希望它像这样工作:
我的代码到目前为止:
redis_conn = Redis()
use_connection(redis_conn)
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later
w = Worker(q)
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid)
w.work()
Run Code Online (Sandbox Code Playgroud)
我认为我最好的解决方案是拥有2名工人,一名为工作A,一名为B工作.工作B工人可以监督工作A,当工作A完成时,开始工作B.
我无法想象拯救我的生命是我如何让一个工人监视另一个人的状态.我可以通过job.id从作业A中获取作业ID.我可以使用w.name获取工作者名称.但对于我如何将任何信息传递给其他工作人员并不是最模糊的.
或者,有一个更简单的方法来做到这一点,我完全失踪了?
task-queue ×10
python ×6
celery ×2
django ×2
java ×2
flask ×1
memory-leaks ×1
networkx ×1
nginx ×1
python-huey ×1
python-rq ×1
queue ×1
redis ×1