标签: task-queue

Networkx 作为任务队列?

我有一个有向无环图networkx。每个节点代表一个任务,节点的前驱是任务依赖项(给定任务在其依赖项执行之前无法执行)。

我想在异步任务队列中“执行”图表,类似于提供的celery服务(以便我可以轮询作业的状态、检索结果等)。Celery 不提供创建 DAG 的能力(据我所知),并且在所有依赖项完成后立即转移到 a 的能力task将是至关重要的(DAG 可能有多个路径,即使一个任务很慢/阻塞) ,可能会继续执行其他任务等)。

有没有任何简单的例子说明我如何实现这一目标,或者甚至networkx与集成celery

python task-queue networkx celery directed-acyclic-graphs

8
推荐指数
1
解决办法
1441
查看次数

芹菜烧瓶-无法使用应用程序上下文

我有一个Flask应用程序,其注册如下:

APP = Flask(__name__)
APP.config.from_object('config')
Run Code Online (Sandbox Code Playgroud)

我为URL定义了一个视图,在该视图中调用了一个函数,该函数与DB交互。

from tasks import some_func
.
.
.
some_func.delay(params)
Run Code Online (Sandbox Code Playgroud)

在task.py文件中,我正在创建Celery实例,如下所示:

# Config values don't work here
celery = Celery('tasks', broker='amqp://', backend='amqp://')
.
.
.
@celery.task()
def some_func():
    #DB interactions
Run Code Online (Sandbox Code Playgroud)

现在我得到一个错误,指出:

RuntimeError: Working outside of application context.
Run Code Online (Sandbox Code Playgroud)

我了解了应用程序上下文以及如何使用它们。我已经导入current_app我的tasks.py文件,并使用上下文如下尝试:

@celery.task()
def some_func():
    with current_app.app_context():
        #DB interactions
Run Code Online (Sandbox Code Playgroud)

但是我仍然收到相同的错误。我还尝试从主文件中推送上下文,如下所示:

ctx = APP.app_context()
ctx.push()
Run Code Online (Sandbox Code Playgroud)

但是还没有运气。

如何使Celery与Flask配合使用?

注意:我已经在这里尝试了他们的例子

python message-queue task-queue celery flask

8
推荐指数
1
解决办法
3339
查看次数

有没有办法确保GAE上的任务队列的FIFO(先进先出)行为?

有没有办法确保GAE上的任务队列的FIFO(先进先出)行为?

GAE文档说FIFO是影响任务执行顺序的因素之一,但是相同的文档说"系统的调度可以'将新任务"跳到队列的头部"并且我已经通过测试确认了这种行为.效果:我的事件正在按顺序处理.

文件说:

https://developers.google.com/appengine/docs/java/taskqueue/overview-push

执行任务的顺序取决于几个因素:

任务在队列中的位置.App Engine尝试基于FIFO>(先进先出)顺序处理任务.通常,任务被插入到队列的末尾,并从队列的头部执行.

队列中的任务积压.系统尝试通过对调度程序的特别优化的通知,为任何给定任务提供尽可能低的延迟.因此,在队列具有大量积压任务的情况下,系统的调度可以将新任务"跳转"到队列的头部.

任务的etaMillis属性的值.此属性指定任务可以执行的最早时间.App Engine始终等待指定的ETA处理推送任务.

任务的countdownMillis属性的值.此属性指定在执行任务之前等待的最小秒数.倒计时和eta是互斥的; 如果指定一个,请不要指定另一个.

我需要做什么?在我的用例中,我将每天处理1-2万个来自车辆的事件.这些事件可以任何间隔(1秒,1分钟或1小时)发送.必须确保事件处理的顺序.我需要按时间戳顺序处理,这是在车辆内部的嵌入式设备上生成的.

我现在有什么?

  1. 由使用者调用并创建任务的Rest servlet(事件数据在有效负载上).

  2. 在此之后,工作者servlet获得此任务并且:

    • 反序列化事件数据;

    • 将事件放在数据存储区中;

    • 更新数据存储上的车辆.

那么,有没有什么方法可以确保FIFO行为?或者我怎样才能改进这个解决方案呢?

java google-app-engine task-queue

7
推荐指数
1
解决办法
2098
查看次数

NDB在长时间请求期间不清除内存

我目前正在将一个长时间运行的作业卸载到TaskQueue来计算数据存储区中NDB实体之间的连接.

基本上,此队列处理几个实体键列表,这些实体键与节点中querynode_in_connected_nodes函数相关联GetConnectedNodes:

class GetConnectedNodes(object):
"""Class for getting the connected nodes from a list of nodes in a paged way"""
def __init__(self, list, query):
    # super(GetConnectedNodes, self).__init__()
    self.nodes = [ndb.model.Key('Node','%s' % x) for x in list]
    self.cursor = 0
    self.MAX_QUERY = 100
    # logging.info('Max query - %d' % self.MAX_QUERY)
    self.max_connections = len(list)
    self.connections = deque()
    self.query=query

def node_in_connected_nodes(self):
    """Checks if a node exists in the connected nodes of the next node in the 
       node list. …
Run Code Online (Sandbox Code Playgroud)

python google-app-engine memory-leaks task-queue app-engine-ndb

7
推荐指数
1
解决办法
1095
查看次数

huey消费者配置在哪里?

我正在尝试为python运行Huey任务队列(这是Celery的替代方案)而且我坚持用main.Configuration启动使用者(正如它在教程中所写的那样).我知道huey_consumer正在python中的某个地方寻找配置文件,但是根据教程我不能让它工作,我不知道为什么我应该编写配置文件并将其作为模块(而不是文件)加载.

当我运行huey_consumer.py main.Configuration它返回Unable to import "main".

休伊也把这个问题写成了他们共同的问题,但它也没有真正帮助.

如果有人使用Huey,请帮助我.

python django task-queue python-huey

7
推荐指数
1
解决办法
1066
查看次数

如何在Python中正确捕获和处理RQ超时?

试图找到一种捕获RQ作业超时的好方法,因此可以在超时后重新排队.

基本上,正确的解决方案将提供一种方法(例如,工作者中的异常处理程序或类似的东西)来重新排队超时的作业.此外,如果作业返回failed队列,那也是一个很好的答案.

非常感谢!任何帮助将不胜感激!

python queue task-queue redis python-rq

7
推荐指数
1
解决办法
1534
查看次数

Django中的并发加载处理

我正在使用Django进行新的Web项目.它基本上是Quora克隆.可以说我在它下面有一个问题和Upvote按钮.如果大约8-9k的人同时点击那个Upvote按钮(同一个问题),我该如何处理呢?我使用Nginx作为前端服务器而Apache作为后端服务器?我应该用什么来处理这么多的负载?

django load-balancing nginx task-queue

7
推荐指数
1
解决办法
320
查看次数

在 Google App Engine 上使用任务队列时如何确定任务的优先级?

我正在尝试解决以下问题:

  1. 我有一系列想要执行的“任务”
  2. 我有固定数量的工作人员来执行这些工作人员(因为它们使用 urlfetch 调用外部 API,并且对此 API 的并行调用数量有限)
  3. 我希望这些“任务”能够“尽快”执行(即最小延迟)
  4. 这些任务是较大任务的一部分,可以根据原始任务的大小进行分类(即,小型原始任务可能生成 1 到 100 个任务,中型任务可能生成 100 到 1000 个任务,大型​​任务可能生成超过 1000 个任务)。

棘手的部分:我想高效地完成所有这些(即最小延迟并使用尽可能多的并行 API 调用 - 不超过限制),但同时尝试防止从“生成大量任务” “大”原始任务延迟“小”原始任务生成的任务。

换句话说:我希望为每个任务分配一个“优先级”,其中“小”任务具有更高的优先级,从而防止“大”任务导致饥饿。

一些搜索似乎并没有表明任何预制的东西可用,所以我想出了以下内容:

  • 创建三个推送队列:tasks-small, tasks-medium,tasks-large
  • 为每个请求设置最大并发请求数,使总数达到并发 API 调用的最大数量(例如,如果最大并发 API 调用数为 200,我可以将其设置tasks-smallmax_concurrent_requests30、60tasks-mediumtasks-large100)
  • 排队任务时,检查编号。每个队列中待处理的任务(使用类似 QueueStatistics 类的东西),并且,如果其他队列未 100% 使用,则将任务排入其中,否则只需将任务排入具有相应大小的队列中。

例如,如果我们有T1一个小任务的一部分的任务,首先检查是否tasks-small有空闲的“槽”并将其排入队列。否则检查tasks-mediumtasks-large。如果它们都没有空闲插槽,则tasks-small无论如何将其排队,并且它将在处理之前添加的任务之后进行处理(注意:这不是最佳选择,因为如果“插槽”在其他队列上释放,它们仍然不会处理队列中待处理的任务tasks-small

另一种选择是使用 PULL 队列,并让一个中央“协调器”根据优先级从该队列中拉取并分派它们,但这似乎会增加一点延迟。

然而,这似乎有点老套,我想知道是否有更好的选择。


编辑:经过一些想法和反馈后,我正在考虑通过以下方式使用 PULL 队列:

  • 有两个 PULL 队列 (medium-taskslarge-tasks …

google-app-engine task-queue

7
推荐指数
1
解决办法
2674
查看次数

Google App Engine(Java)TaskQueue API:如何查询正在运行/挂起的任务数量?

是否可以以编程方式查询Task Queue API以查看当前正在执行/挂起的任务数量?

我没有看到任何方法在API中执行此操作,因此我使用在数据存储区中创建对象来表示排队的任务.运行时,任务会从数据存储中删除相应的条目.

可以想象,这很容易失去同步.我真的很高兴能够在队列中获得给定队列名称的简单任务计数.

java google-app-engine task-queue

6
推荐指数
1
解决办法
872
查看次数

Python/rq - 监视工作者状态

如果这是一个愚蠢的问题,我道歉并将羞辱我的头,但是:

我正在使用rq在Python中排队作业.我希望它像这样工作:

  1. 工作A开始.作业A通过Web API抓取数据并存储它.
  2. 工作A运行.
  3. 工作A完成.
  4. 完成A后,作业B开始.作业B检查作业A存储的每个记录,并添加一些其他响应数据.
  5. 完成作业B后,用户会收到一封快乐的电子邮件,说明他们的报告准备就绪.

我的代码到目前为止:

redis_conn = Redis()
use_connection(redis_conn)
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later
w = Worker(q)
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid)
w.work()
Run Code Online (Sandbox Code Playgroud)

我认为我最好的解决方案是拥有2名工人,一名为工作A,一名为B工作.工作B工人可以监督工作A,当工作A完成时,开始工作B.

我无法想象拯救我的生命是我如何让一个工人监视另一个人的状态.我可以通过job.id从作业A中获取作业ID.我可以使用w.name获取工作者名称.但对于我如何将任何信息传递给其他工作人员并不是最模糊的.

或者,有一个更简单的方法来做到这一点,我完全失踪了?

python task-queue

6
推荐指数
2
解决办法
3899
查看次数