我正在开发一个新的监控系统,该系统可以测量 Celery 队列吞吐量,并在队列备份时帮助向团队发出警报。在我的工作过程中,我遇到了一些我不理解的奇怪行为(并且在 Celery 规范中没有详细记录)。
出于测试目的,我设置了一个端点,该端点将使用 16 个可用于模拟备份队列的长时间运行的任务填充队列。框架是Flask,队列代理是Redis。Celery 配置为每个工作人员可以并行处理最多 4 个任务,而我有 2 个工作人员正在运行。
api/health.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
Run Code Online (Sandbox Code Playgroud)
工作.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
Run Code Online (Sandbox Code Playgroud)
以下是我模拟备份生产队列的方法:
/api/debug/create-long-queue来模拟队列中的备份。根据上面的计算,每个工人应该忙着睡觉 1 分钟(总共可以同时处理 8 个任务。每个任务只休眠 30 秒,总共 16 个任务。)handle_incoming_message。这是我看到的使用花来检查队列的内容:
sleepy_job不到新任务的迹象 ,尽管我确信第二个 API 调用已被调用。handle_incoming_messagehandle_incoming_message.delay()sleepy_job任务完成后(约 30 秒),我 …我已经阅读了其他所有线程并尝试了所有内容,但确定不会点击"构建和存档"功能.
我正在运行OSX 10.6.7和Xcode 3.2.4.我正在开发一个MAC应用程序,我正在尝试将其提交到Mac App Store,但为了做到这一点,我需要运行Build and Archive功能,而我根本无法弄清楚如何让它运行.我从其他一些帖子中听说过这些"开发者工具1.1",但也找不到.如果您即将说这将解决问题,那么请链接我,否则,任何建议都非常感谢.
PS是即时发布模式
我有一个运行 SQLAlchemy 和 PostgresQL 的 Flask 应用程序来处理大量数据。我们在前端显示的一件事是一个仪表板,其中包含给定组织的几个汇总统计信息。最近这个端点运行得非常缓慢,所以我一直在尝试优化它并提高性能。
我首先对 BaseQuery 进行子类化,并在.count()不使用子查询的情况下实现内置的 SQLAlchemy 的精简版本。
优化查询
from sqlalchemy import func
from sqlalchemy.orm import lazyload
from flask_sqlalchemy import BaseQuery
class OptimisedQuery(BaseQuery):
def optimised_count(query):
count_query = query.options(lazyload('*')).statement.with_only_columns([func.count()]).order_by(None)
return query.session.execute(count_query).scalar()
Run Code Online (Sandbox Code Playgroud)
api/dashboard.py
@dashboards.route("/api/dashboard/stats", methods=["GET"])
@authentication_required
def stats(current_user):
org = current_user.organization
total_subscribers = Subscriber.query.filter_by(
unsubscribed=False, organization=org
).optimised_count()
total_conversations = SubscriberConversationState.query.filter_by(
organization=org
).optimised_count()
total_messages = Message.query.filter_by(
organization=org
).optimised_count()
total_unsubscribers = Subscriber.query.filter_by(
unsubscribed=True, organization=org
).optimised_count()
return jsonify(
dict(
total_subscribers=total_subscribers,
total_conversations=total_conversations,
total_messages=total_messages,
total_unsubscribers=total_unsubscribers,
)
)
Run Code Online (Sandbox Code Playgroud)
这绝对是朝着正确方向迈出的一步,显着降低了端点延迟。话虽如此,加载仍然需要 9-15 秒,所以我深入到 …